This is an automated email from the ASF dual-hosted git repository.

xunh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-bluemarlin.git


The following commit(s) were added to refs/heads/main by this push:
     new 669cae0  update dlpredictor
     new 1e66a17  Merge pull request #4 from radibnia77/main
669cae0 is described below

commit 669cae028f642e0c4a31614a18a9d70a29d696b6
Author: Reza <[email protected]>
AuthorDate: Wed Apr 21 16:58:01 2021 -0700

    update dlpredictor
---
 Processes/dlpredictor/README.md                    |   2 +-
 Processes/dlpredictor/conf/config.yml              |  20 ++--
 Processes/dlpredictor/dag/dlpredictor_dag.py       |  75 +++++++++++++
 .../dlpredictor/{log.py => configutil.py}          |  29 +++--
 Processes/dlpredictor/dlpredictor/log.py           |   6 +-
 Processes/dlpredictor/dlpredictor/main_es_push.py  |  83 ++++++++++++++
 Processes/dlpredictor/dlpredictor/main_spark_es.py | 119 +++++++++------------
 Processes/dlpredictor/dlpredictor/show_config.py   |  89 +++++++++++++++
 .../dlpredictor/lib/elasticsearch-hadoop-7.6.2.jar | Bin 0 -> 1012419 bytes
 Processes/dlpredictor/run.sh                       |  14 ++-
 Processes/dlpredictor/setup.py                     |  16 ---
 11 files changed, 345 insertions(+), 108 deletions(-)

diff --git a/Processes/dlpredictor/README.md b/Processes/dlpredictor/README.md
index 171fbbf..4de8fcd 100644
--- a/Processes/dlpredictor/README.md
+++ b/Processes/dlpredictor/README.md
@@ -14,7 +14,7 @@ pip install -r requirements.txt
 2.     Transfer the dlpredictor directory to ~/code/dlpredictor on a machine 
which also has Spark Client.
 3.  cd dlpredictor
 4.  pip install -r requirements.txt (to install required packages)
-5.  python setup install (to install predictor_dl_model package)
+5.  python setup.py install (to install predictor_dl_model package)
 6.     Run run.sh 
 
 ### Documentation
diff --git a/Processes/dlpredictor/conf/config.yml 
b/Processes/dlpredictor/conf/config.yml
index b10ebcb..cbb5513 100644
--- a/Processes/dlpredictor/conf/config.yml
+++ b/Processes/dlpredictor/conf/config.yml
@@ -1,12 +1,18 @@
-log_level: 'INFO'
-product_tag: 'dlpm'
-pipeline_tag: '11092020'
-factdata_table: 'factdata_hq_09222020_bucket_0'
-distribution_table: '{product_tag}_{pipeline_tag}_tmp_distribution'
-norm_table: '{product_tag}_{pipeline_tag}_trainready'
-model_stat_table: '{product_tag}_{pipeline_tag}_model_stat'
+log_level: 'WARN'
+product_tag: 'dlpredictor'
+pipeline_tag: '04212021'
+
+#input tables from dlpm pipeline
+factdata_table: 'factdata_hq_09222020_r_ipl_mapped_11052020' # this looks like 
dlpm_03182021
+distribution_table: 'dlpm_03182021_tmp_distribution'
+norm_table: 'dlpm_03182021_trainready'
+model_stat_table: 'dlpm_03182021_model_stat'
 bucket_size: 10
 bucket_step: 1
+
+yesterday: '2020-05-31'
+serving_url: 'http://10.193.217.105:8505/v1/models/dlpm6:predict'
+
 es_host: '10.213.37.41'
 es_port: '9200'
 es_predictions_index: '{product_tag}_{pipeline_tag}_predictions'
diff --git a/Processes/dlpredictor/dag/dlpredictor_dag.py 
b/Processes/dlpredictor/dag/dlpredictor_dag.py
new file mode 100644
index 0000000..6fd74e0
--- /dev/null
+++ b/Processes/dlpredictor/dag/dlpredictor_dag.py
@@ -0,0 +1,75 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+ 
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from airflow import DAG
+import datetime as dt
+from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
+from datetime import timedelta
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.bash_operator import BashOperator
+
+default_args = {
+    'owner': 'dlpredictor',
+    'depends_on_past': False,
+    'start_date': dt.datetime(2021, 3, 15),
+    'retries': 0,
+    'retry_delay': timedelta(minutes=1),
+}
+
+dag = DAG(
+    'dlpredictor',
+    default_args=default_args,
+    schedule_interval=None,
+
+)
+
+def sparkOperator(
+        file,
+        task_id,
+        executor_cores=5,
+        num_executors=10,
+        **kwargs
+):
+        return SparkSubmitOperator(
+                
application='/home/airflow/airflow-apps/dlpredictor/dlpredictor/{}'.format(file),
+                
application_args=['/home/airflow/airflow-apps/dlpredictor/conf/config.yml'],
+                conn_id='spark_default',
+                executor_memory='16G',
+                conf={'spark.driver.maxResultSize': '8g'},
+                driver_memory='16G',
+                executor_cores=executor_cores,
+                num_executors=num_executors,
+                task_id=task_id,
+                dag=dag,
+                **kwargs
+        )
+
+
+show_config = sparkOperator('show_config.py', 'show_config')
+
+dlpredictor = sparkOperator('main_spark_es.py',
+        'dlpredictor',
+       
py_files='/home/airflow/airflow-apps/dlpredictor/dist/dlpredictor-1.6.0-py2.7.egg,/home/airflow/airflow-apps/dlpredictor/lib/imscommon-2.0.0-py2.7.egg,/home/airflow/airflow-apps/dlpredictor/lib/predictor_dl_model-1.6.0-py2.7.egg',
+       
jars='/home/airflow/airflow-apps/dlpredictor/lib/elasticsearch-hadoop-6.8.0.jar')
+
+es_push = sparkOperator('main_es_push.py',
+        'es_push',
+        3,
+        3,
+       
py_files='/home/airflow/airflow-apps/dlpredictor/dist/dlpredictor-1.6.0-py2.7.egg,/home/airflow/airflow-apps/dlpredictor/lib/imscommon-2.0.0-py2.7.egg,/home/airflow/airflow-apps/dlpredictor/lib/predictor_dl_model-1.6.0-py2.7.egg',
+       
jars='/home/airflow/airflow-apps/dlpredictor/lib/elasticsearch-hadoop-6.8.0.jar')
+
+show_config >> dlpredictor >> es_push
\ No newline at end of file
diff --git a/Processes/dlpredictor/dlpredictor/log.py 
b/Processes/dlpredictor/dlpredictor/configutil.py
similarity index 52%
copy from Processes/dlpredictor/dlpredictor/log.py
copy to Processes/dlpredictor/dlpredictor/configutil.py
index 1190a7c..7a069b0 100644
--- a/Processes/dlpredictor/dlpredictor/log.py
+++ b/Processes/dlpredictor/dlpredictor/configutil.py
@@ -14,14 +14,25 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-import logging.config
-import os
+import re
 
-path = 'conf/log.conf'
+def resolve_placeholder(in_dict):
+    stack = []
+    for key in in_dict.keys():
+        stack.append((in_dict, key))
+    while len(stack) > 0:
+        (_dict, key) = stack.pop()
+        value = _dict[key]
+        if type(value) == dict:
+            for _key in value.keys():
+                stack.append((value, _key))
+        elif type(value) == str:
+            z = re.findall('\{(.*?)\}', value)
+            if len(z) > 0:
+                new_value = value
+                for item in z:
+                    if item in in_dict and type(in_dict[item]) == str:
+                        new_value = new_value.replace(
+                            '{'+item+'}', in_dict[item])
+                _dict[key] = new_value
 
-logging.config.fileConfig(path)
-logger_operation = logging.getLogger('operation')
-logger_run = logging.getLogger('run')
-logger_security = logging.getLogger('security')
-logger_user = logging.getLogger('user')
-logger_interface = logging.getLogger('interface')
diff --git a/Processes/dlpredictor/dlpredictor/log.py 
b/Processes/dlpredictor/dlpredictor/log.py
index 1190a7c..5da9479 100644
--- a/Processes/dlpredictor/dlpredictor/log.py
+++ b/Processes/dlpredictor/dlpredictor/log.py
@@ -17,9 +17,13 @@
 import logging.config
 import os
 
+# Get the base path of this installation.
+# Assuming that this file is packaged in a .egg file.
+basedir = 
os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
 path = 'conf/log.conf'
+fullpath = os.path.join(basedir, path)
 
-logging.config.fileConfig(path)
+logging.config.fileConfig(fullpath)
 logger_operation = logging.getLogger('operation')
 logger_run = logging.getLogger('run')
 logger_security = logging.getLogger('security')
diff --git a/Processes/dlpredictor/dlpredictor/main_es_push.py 
b/Processes/dlpredictor/dlpredictor/main_es_push.py
new file mode 100644
index 0000000..351bfb6
--- /dev/null
+++ b/Processes/dlpredictor/dlpredictor/main_es_push.py
@@ -0,0 +1,83 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+ 
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import argparse
+import sys
+import yaml
+
+from pyspark import SparkContext
+from pyspark.sql import HiveContext
+
+from dlpredictor.configutil import *
+from dlpredictor.log import *
+from dlpredictor import transform
+
+
+def run (cfg):
+    sc = SparkContext()
+    hive_context = HiveContext(sc)
+    sc.setLogLevel(cfg['log_level'])
+
+    # Load the data frame from Hive.
+    table_name = cfg['es_predictions_index']
+    command = """select * from {}""".format(table_name)
+    df = hive_context.sql(command)
+
+    # Select the columns to push to elasticsearch.
+    rdd = df.rdd.map(lambda x: transform.format_data(x, 'ucdoc'))
+
+    # Write the data frame to elasticsearch.
+    es_write_conf = {"es.nodes": cfg['es_host'],
+                     "es.port": cfg['es_port'],
+                     "es.resource": 
cfg['es_predictions_index']+'/'+cfg['es_predictions_type'],
+                     "es.batch.size.bytes": "1000000",
+                     "es.batch.size.entries": "100",
+                     "es.input.json": "yes",
+                     "es.mapping.id": "uckey",
+                     "es.nodes.wan.only": "true",
+                     "es.write.operation": "upsert"}
+    rdd.saveAsNewAPIHadoopFile(
+        path='-',
+        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
+        keyClass="org.apache.hadoop.io.NullWritable",
+        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
+        conf=es_write_conf)
+
+    sc.stop()
+
+
+if __name__ == '__main__':
+
+    # Get the execution parameters.
+    parser = argparse.ArgumentParser(description='Prepare data')
+    parser.add_argument('config_file')
+    args = parser.parse_args()
+
+    # Load config file.
+    try:
+        with open(args.config_file, 'r') as ymlfile:
+            cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
+            resolve_placeholder(cfg)
+            logger_operation.info("Successfully open 
{}".format(args.config_file))
+    except IOError as e:
+        logger_operation.error("Open config file unexpected error: I/O 
error({0}): {1}".format(e.errno, e.strerror))
+    except:
+        logger_operation.error("Unexpected error:{}".format(sys.exc_info()[0]))
+        raise
+
+    # Run this module.
+    run(cfg)
+
diff --git a/Processes/dlpredictor/dlpredictor/main_spark_es.py 
b/Processes/dlpredictor/dlpredictor/main_spark_es.py
index c51ea37..a8be99f 100644
--- a/Processes/dlpredictor/dlpredictor/main_spark_es.py
+++ b/Processes/dlpredictor/dlpredictor/main_spark_es.py
@@ -15,43 +15,23 @@
 #  limitations under the License.
 
 import argparse
-import re
 # -*- coding: UTF-8 -*-
 import sys
 from datetime import datetime, timedelta
-
 import yaml
+
 from pyspark import SparkContext
 from pyspark.sql import HiveContext
 from pyspark.sql.functions import udf, expr, collect_list, struct
 from pyspark.sql.types import StringType, ArrayType, MapType, FloatType, 
StructField, StructType
 
 from dlpredictor import transform
+from dlpredictor.configutil import *
 from dlpredictor.log import *
 from dlpredictor.prediction.forecaster import Forecaster
 from dlpredictor.util.sparkesutil import *
 
 
-def resolve_placeholder(in_dict):
-    stack = []
-    for key in in_dict.keys():
-        stack.append((in_dict, key))
-    while len(stack) > 0:
-        (_dict, key) = stack.pop()
-        value = _dict[key]
-        if type(value) == dict:
-            for _key in value.keys():
-                stack.append((value, _key))
-        elif type(value) == str:
-            z = re.findall('\{(.*?)\}', value)
-            if len(z) > 0:
-                new_value = value
-                for item in z:
-                    if item in in_dict and type(in_dict[item]) == str:
-                        new_value = new_value.replace('{'+item+'}', 
in_dict[item])
-                _dict[key] = new_value
-
-
 def sum_count_array(hour_counts):
     '''
     [{14: [u'1:3']}, {13: [u'1:3']}, {11: [u'1:3']}, {15: [u'1:5']}, {22: 
[u'1:8']}, {23: [u'1:6']}, {19: [u'1:1']}, {18: [u'1:1']}, {12: [u'1:5']}, {17: 
[u'1:5']}, {20: [u'1:3']}, {21: [u'1:21']}]
@@ -122,20 +102,20 @@ def __save_as_table(df, table_name, hive_context, 
create_table):
         hive_context.sql(command)
 
 
-def run(cfg, yesterday, serving_url):
+def run(cfg):
 
     # os.environ[
     #     'PYSPARK_SUBMIT_ARGS'] = '--jars 
/home/reza/eshadoop/elasticsearch-hadoop-6.5.2/dist/elasticsearch-hadoop-6.5.2.jar
 pyspark-shell'
 
-    es_write_conf = {"es.nodes": cfg['es_host'],
-                     "es.port": cfg['es_port'],
-                     "es.resource": 
cfg['es_predictions_index']+'/'+cfg['es_predictions_type'],
-                     "es.batch.size.bytes": "1000000",
-                     "es.batch.size.entries": "100",
-                     "es.input.json": "yes",
-                     "es.mapping.id": "uckey",
-                     "es.nodes.wan.only": "true",
-                     "es.write.operation": "upsert"}
+    # es_write_conf = {"es.nodes": cfg['es_host'],
+    #                  "es.port": cfg['es_port'],
+    #                  "es.resource": 
cfg['es_predictions_index']+'/'+cfg['es_predictions_type'],
+    #                  "es.batch.size.bytes": "1000000",
+    #                  "es.batch.size.entries": "100",
+    #                  "es.input.json": "yes",
+    #                  "es.mapping.id": "uckey",
+    #                  "es.nodes.wan.only": "true",
+    #                  "es.write.operation": "upsert"}
 
     sc = SparkContext()
     hive_context = HiveContext(sc)
@@ -151,6 +131,9 @@ def run(cfg, yesterday, serving_url):
     traffic_dist = cfg['traffic_dist']
     model_stat_table = cfg['model_stat_table']
 
+    yesterday = cfg['yesterday']
+    serving_url = cfg['serving_url']
+
     model_stats = get_model_stats(hive_context, model_stat_table)
 
     # Read dist
@@ -163,6 +146,9 @@ def run(cfg, yesterday, serving_url):
         FROM {} AS DIST
         """.format(distribution_table)
     df_dist = hive_context.sql(command)
+    df_dist = df_dist.repartition('uckey')
+    df_dist.cache()
+    df_dist.count()
 
     # Read norm table
     # DataFrame[uckey: string, ts: array<int>, p: float, a__n: float, a_1_n: 
float, a_2_n: float, a_3_n: float, a_4_n: float, a_5_n: float, a_6_n: float, 
t_UNKNOWN_n: float, t_3G_n: float, t_4G_n: float, t_WIFI_n: float, t_2G_n: 
float, g__n: float, g_g_f_n: float, g_g_m_n: float, g_g_x_n: float, 
price_cat_1_n: float, price_cat_2_n: float, price_cat_3_n: float, si_vec_n: 
array<float>, r_vec_n: array<float>, p_n: float, ts_n: array<float>]
@@ -216,38 +202,33 @@ def run(cfg, yesterday, serving_url):
 
         df = hive_context.sql(command)
 
+        # add partition_group
+        df = df.repartition("uckey")
+
         # [Row(count_array=[u'1:504'], day=u'2019-11-02', hour=2, 
uckey=u'magazinelock,04,WIFI,g_m,1,CPM,78', hour_price_imp_map={2: [u'1:504']})]
-        df = df.withColumn('hour_price_imp_map',
-                           expr("map(hour, count_array)"))
+        df = df.withColumn('hour_price_imp_map', expr("map(hour, 
count_array)"))
 
         # 
[Row(uckey=u'native,68bcd2720e5011e79bc8fa163e05184e,4G,g_m,2,CPM,19', 
day=u'2019-11-02', hour_price_imp_map_list=[{15: [u'3:3']}, {7: [u'3:5']}, {10: 
[u'3:3']}, {9: [u'3:1']}, {16: [u'3:2']}, {22: [u'3:11']}, {23: [u'3:3']}, {18: 
[u'3:7']}, {0: [u'3:4']}, {1: [u'3:2']}, {19: [u'3:10']}, {8: [u'3:4']}, {21: 
[u'3:2']}, {6: [u'3:1']}])]
-        df = df.groupBy('uckey', 'day').agg(
-            
collect_list('hour_price_imp_map').alias('hour_price_imp_map_list'))
+        df = df.groupBy('uckey', 
'day').agg(collect_list('hour_price_imp_map').alias('hour_price_imp_map_list'))
 
         # 
[Row(uckey=u'native,68bcd2720e5011e79bc8fa163e05184e,4G,g_m,2,CPM,19', 
day=u'2019-11-02', day_price_imp=[u'3:58'])]
         df = df.withColumn('day_price_imp', udf(
             sum_count_array, 
ArrayType(StringType()))(df.hour_price_imp_map_list)).drop('hour_price_imp_map_list')
 
         # 
[Row(uckey=u'native,68bcd2720e5011e79bc8fa163e05184e,4G,g_m,2,CPM,19', 
day=u'2019-11-02', day_price_imp=[u'3:58'], day_price_imp_map={u'2019-11-02': 
[u'3:58']})]
-        df = df.withColumn('day_price_imp_map', expr(
-            "map(day, day_price_imp)"))
+        df = df.withColumn('day_price_imp_map', expr("map(day, 
day_price_imp)"))
 
         # [Row(uckey=u'native,z041bf6g4s,WIFI,g_f,1,CPM,71', 
day_price_imp_map_list=[{u'2019-11-02': [u'1:2', u'2:261']}, {u'2019-11-03': 
[u'2:515']}])])
-        df = df.groupBy('uckey').agg(collect_list(
-            'day_price_imp_map').alias('day_price_imp_map_list'))
+        df = 
df.groupBy('uckey').agg(collect_list('day_price_imp_map').alias('day_price_imp_map_list'))
 
         # [Row(uckey=u'native,z041bf6g4s,WIFI,g_f,1,CPM,71', 
day_price_imp_map_list=[{u'2019-11-02': [u'1:2', u'2:261']}, {u'2019-11-03': 
[u'2:515']}], ratio=0.09467455744743347, cluster_uckey=u'892', price_cat=u'1')]
         df = df.join(df_dist, on=['uckey'], how='inner')
 
         # df_uckey_cluster keeps the ratio and cluster_key for only uckeys 
that are being processed
         if not df_uckey_cluster:
-            df_uckey_cluster = df.select(
-                'uckey', 'cluster_uckey', 'ratio', 'price_cat')
-            df_uckey_cluster.cache()
+            df_uckey_cluster = df.select('uckey', 'cluster_uckey', 'ratio', 
'price_cat')
         else:
-            df_uckey_cluster = df.select(
-                'uckey', 'cluster_uckey', 'ratio', 
'price_cat').union(df_uckey_cluster)
-            df_uckey_cluster.cache()
+            df_uckey_cluster = df.select('uckey', 'cluster_uckey', 'ratio', 
'price_cat').union(df_uckey_cluster)
 
         # [Row(cluster_uckey=u'2469', price_cat=u'2', 
cluster_day_price_imp_list=[[{u'2019-11-02': [u'2:90']}, {u'2019-11-03': 
[u'2:172']}]])])
         df = df.groupBy('cluster_uckey', 'price_cat').agg(
@@ -261,18 +242,15 @@ def run(cfg, yesterday, serving_url):
 
         if not df_prediction_ready:
             df_prediction_ready = df
-            df_prediction_ready.cache()
         else:
             df = df_prediction_ready.union(df)
-            df = df.groupBy('cluster_uckey', 'price_cat').agg(
-                collect_list('ts').alias('ts_list'))
+            df = df.groupBy('cluster_uckey', 
'price_cat').agg(collect_list('ts').alias('ts_list'))
             df = df.withColumn('ts', udf(sum_day_count_array,
                                          ArrayType(MapType(StringType(), 
ArrayType(StringType()))))(df.ts_list))
             df = df.drop('ts_list')
 
             # [Row(cluster_uckey=u'magazinelock,03,WIFI,g_f,1,CPM,60', 
ts=[{u'2019-11-02': [u'1:2']}])]
             df_prediction_ready = df
-            df_prediction_ready.cache()
 
     # [Row(cluster_uckey=u'1119', price_cat=u'2', ts=[{u'2019-11-02': 
[u'1:862', u'3:49', u'2:1154'], u'2019-11-03': [u'1:596', u'3:67', 
u'2:1024']}])]
     df = df_prediction_ready
@@ -280,18 +258,15 @@ def run(cfg, yesterday, serving_url):
     df = df.join(df_norm, on=['cluster_uckey', 'price_cat'], how='inner')
 
     # [Row(cluster_uckey=u'1119', price_cat=u'2', ts=[{u'2019-11-02': 
[u'1:862', u'3:49', u'2:1154'], u'2019-11-03': [u'1:596', u'3:67', 
u'2:1024']}], a__n=-0.005224577616900206, a_1_n=0.6089736819267273, 
a_2_n=-0.21013110876083374, a_3_n=0.16884993016719818, 
a_4_n=-0.3416250944137573, a_5_n=0.15184317529201508, 
a_6_n=-0.16529197990894318, t_UNKNOWN_n=-0.4828081429004669, 
t_3G_n=1.2522615194320679, t_4G_n=-0.15080969035625458, 
t_WIFI_n=-0.35078370571136475, t_2G_n=1.991615653038025, g__n [...]
-    df = df.join(df_uckey_cluster, on=[
-                 'cluster_uckey', 'price_cat'], how='inner')
+    df = df.join(df_uckey_cluster, on=['cluster_uckey', 'price_cat'], 
how='inner')
 
     predictor_udf = udf(transform.predict_daily_uckey(days=day_list,
                                                       serving_url=serving_url, 
forecaster=forecaster, model_stats=model_stats, columns=df.columns), 
MapType(StringType(), FloatType()))
 
-    df = df.withColumn('day_prediction_map',
-                       predictor_udf(struct([df[name] for name in 
df.columns])))
+    df = df.withColumn('day_prediction_map', predictor_udf(struct([df[name] 
for name in df.columns])))
 
     # [Row(cluster_uckey=u'1119', price_cat=u'2', 
day_prediction_map={u'2019-11-02': 220.0, u'2019-11-03': 305.0}, 
ratio=0.11989551782608032, 
uckey=u'native,66bcd2720e5011e79bc8fa163e05184e,WIFI,g_m,5,CPC,5')]
-    df = df.select('cluster_uckey', 'price_cat',
-                   'day_prediction_map', 'ratio', 'uckey')
+    df = df.select('cluster_uckey', 'price_cat', 'day_prediction_map', 
'ratio', 'uckey')
 
     # [Row(ucdoc_elements=Row(price_cat=u'2', ratio=0.11989551782608032, 
day_prediction_map={u'2019-11-02': 220.0, u'2019-11-03': 305.0}), 
uckey=u'native,66bcd2720e5011e79bc8fa163e05184e,WIFI,g_m,5,CPC,5')]
     ucdoc_elements_type = StructType([StructField('price_cat', StringType(), 
False), StructField(
@@ -300,19 +275,25 @@ def run(cfg, yesterday, serving_url):
                                                            (price_cat, ratio, 
day_prediction_map), ucdoc_elements_type)(df.price_cat, df.ratio, 
df.day_prediction_map)).select('ucdoc_elements_pre_price_cat', 'uckey')
 
     # [Row(uckey=u'splash,d971z9825e,WIFI,g_m,1,CPT,74', 
ucdoc_elements=[Row(price_cat=u'1', ratio=0.5007790923118591, 
day_prediction_map={u'2019-11-02': 220.0, u'2019-11-03': 305.0})])]
-    df = 
df.groupBy('uckey').agg(collect_list('ucdoc_elements_pre_price_cat').alias('ucdoc_elements'))
+    df = df.groupBy('uckey').agg(collect_list(
+        'ucdoc_elements_pre_price_cat').alias('ucdoc_elements'))
 
     df = df.withColumn('prediction_output', 
udf(transform.generate_ucdoc(traffic_dist), StringType())(
         df.uckey, df.ucdoc_elements))
 
     df_predictions_doc = df.select('uckey', 'prediction_output')
-    rdd = df_predictions_doc.rdd.map(lambda x: transform.format_data(x, 
'ucdoc'))
-    rdd.saveAsNewAPIHadoopFile(
-        path='-',
-        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
-        keyClass="org.apache.hadoop.io.NullWritable",
-        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
-        conf=es_write_conf)
+
+    # Save the predictions to Hive.
+    table_name = cfg['es_predictions_index']
+    df_predictions_doc.write.option("header", "true").option("encoding", 
"UTF-8").mode('overwrite').format('hive').saveAsTable(table_name)
+
+    # rdd = df_predictions_doc.rdd.map(lambda x: transform.format_data(x, 
'ucdoc'))
+    # rdd.saveAsNewAPIHadoopFile(
+    #     path='-',
+    #     outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
+    #     keyClass="org.apache.hadoop.io.NullWritable",
+    #     valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
+    #     conf=es_write_conf)
 
     sc.stop()
 
@@ -321,8 +302,6 @@ if __name__ == '__main__':
 
     parser = argparse.ArgumentParser(description='Prepare data')
     parser.add_argument('config_file')
-    parser.add_argument('yesterday')
-    parser.add_argument('serving_url')
     args = parser.parse_args()
 
     # Load config file
@@ -330,13 +309,11 @@ if __name__ == '__main__':
         with open(args.config_file, 'r') as ymlfile:
             cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
             resolve_placeholder(cfg)
-            logger_operation.info(
-                "Successfully open {}".format(args.config_file))
+            logger_operation.info("Successfully open 
{}".format(args.config_file))
     except IOError as e:
-        logger_operation.error(
-            "Open config file unexpected error: I/O error({0}): 
{1}".format(e.errno, e.strerror))
+        logger_operation.error("Open config file unexpected error: I/O 
error({0}): {1}".format(e.errno, e.strerror))
     except:
         logger_operation.error("Unexpected error:{}".format(sys.exc_info()[0]))
         raise
 
-    run(cfg, args.yesterday, args.serving_url)
+    run(cfg)
diff --git a/Processes/dlpredictor/dlpredictor/show_config.py 
b/Processes/dlpredictor/dlpredictor/show_config.py
new file mode 100644
index 0000000..cd66fdf
--- /dev/null
+++ b/Processes/dlpredictor/dlpredictor/show_config.py
@@ -0,0 +1,89 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+ 
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import yaml
+import argparse
+from pyspark import SparkContext
+from pyspark.sql import HiveContext
+
+def run(cfg, hive_context):
+
+    log_level = cfg['log_level']
+    product_tag = cfg['product_tag']
+    pipeline_tag = cfg['pipeline_tag']
+
+    factdata = cfg['factdata_table']
+    distribution_table = cfg['distribution_table']
+    norm_table = cfg['norm_table']
+    model_stat_table = cfg['model_stat_table']
+    bucket_size = cfg['bucket_size']
+    bucket_step = cfg['bucket_step']
+
+    es_host = cfg['es_host']
+    es_port = cfg['es_port']
+    es_predictions_index = cfg['es_predictions_index']
+    es_predictions_type = cfg['es_predictions_type']
+    holiday_list = cfg['holiday_list']
+    traffic_dist = cfg['traffic_dist']
+
+    for key in cfg:
+        print('{}:  {}'.format(key, cfg[key]))
+    print('')
+
+    print('Output index:')
+    print(es_predictions_index.format(product_tag=product_tag, 
pipeline_tag=pipeline_tag))
+    print('')
+
+    command = "SELECT * FROM {}"
+    df = hive_context.sql(command.format(factdata))
+    df_factdata_schema = df.schema
+    print('Factdata schema')
+    df.printSchema()
+
+    command = "SELECT * FROM {}"
+    df = hive_context.sql(command.format(distribution_table))
+    df_distribution_schema = df.schema
+    print('Distribution schema')
+    df.printSchema()
+
+    command = "SELECT * FROM {}"
+    df = hive_context.sql(command.format(norm_table))
+    df_norm_schema = df.schema
+    print('Norm schema')
+    df.printSchema()
+
+    command = "SELECT * FROM {}"
+    df = hive_context.sql(command.format(model_stat_table))
+    df_model_schema = df.schema
+    print('Model stat schema')
+    df.printSchema()
+
+
+
+if __name__ == '__main__':
+
+    parser = argparse.ArgumentParser(description="DLPredictor")
+    parser.add_argument('config_file')
+    args = parser.parse_args()
+    with open(args.config_file, 'r') as yml_file:
+        cfg = yaml.safe_load(yml_file)
+
+    sc = SparkContext.getOrCreate()
+    sc.setLogLevel('WARN')
+    hive_context = HiveContext(sc)
+
+
+    run(cfg=cfg, hive_context= hive_context)
\ No newline at end of file
diff --git a/Processes/dlpredictor/lib/elasticsearch-hadoop-7.6.2.jar 
b/Processes/dlpredictor/lib/elasticsearch-hadoop-7.6.2.jar
new file mode 100644
index 0000000..e07213d
Binary files /dev/null and 
b/Processes/dlpredictor/lib/elasticsearch-hadoop-7.6.2.jar differ
diff --git a/Processes/dlpredictor/run.sh b/Processes/dlpredictor/run.sh
index a5c5f64..2500d54 100644
--- a/Processes/dlpredictor/run.sh
+++ b/Processes/dlpredictor/run.sh
@@ -1,9 +1,17 @@
 #!/bin/bash
 
-#Start the predictor
+SCRIPTPATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
+
+# Start the predictor
 if true
 then
     # spark-submit --num-executors 10 --executor-cores 5 --jars 
lib/elasticsearch-hadoop-6.5.2.jar dlpredictor/main_spark_es.py conf/config.yml 
'2019-11-03' 's32' '1' 'http://10.193.217.105:8501/v1/models/faezeh:predict'  
     # spark-submit --master yarn --py-files 
dist/dlpredictor-2.0.0-py2.7.egg,dist/imscommon-2.0.0-py2.7.egg,dist/predictor_dl_model-1.0.0-py2.7.egg
 --num-executors 3 --executor-cores 3 --jars lib/elasticsearch-hadoop-6.8.0.jar 
dlpredictor/main_spark_es.py conf/config.yml '2020-02-08' 's32' '1' 
'http://10.193.217.105:8501/v1/models/faezeh:predict'
-    spark-submit --master yarn --num-executors 10 --executor-cores 5 
--executor-memory 16G --driver-memory 16G --py-files 
dist/dlpredictor-1.6.0-py2.7.egg,lib/imscommon-2.0.0-py2.7.egg,lib/predictor_dl_model-1.6.0-py2.7.egg
 --jars lib/elasticsearch-hadoop-6.8.0.jar dlpredictor/main_spark_es.py 
conf/config.yml '2020-05-31' 'http://10.193.217.105:8503/v1/models/dl3:predict'
-fi
\ No newline at end of file
+    spark-submit --master yarn --num-executors 10 --executor-cores 5 
--executor-memory 16G --driver-memory 16G --py-files 
$SCRIPTPATH/dist/dlpredictor-1.6.0-py2.7.egg,$SCRIPTPATH/lib/imscommon-2.0.0-py2.7.egg,$SCRIPTPATH/lib/predictor_dl_model-1.6.0-py2.7.egg
 --jars $SCRIPTPATH/lib/elasticsearch-hadoop-6.8.0.jar 
$SCRIPTPATH/dlpredictor/main_spark_es.py $SCRIPTPATH/conf/config.yml
+fi
+
+# Push the data to elasticsearch
+if true
+then
+    spark-submit --master yarn --num-executors 3 --executor-cores 3 
--executor-memory 16G --driver-memory 16G --py-files 
$SCRIPTPATH/dist/dlpredictor-1.6.0-py2.7.egg,$SCRIPTPATH/lib/imscommon-2.0.0-py2.7.egg
 --jars $SCRIPTPATH/lib/elasticsearch-hadoop-6.8.0.jar 
$SCRIPTPATH/dlpredictor/main_es_push.py $SCRIPTPATH/conf/config.yml
+fi
diff --git a/Processes/dlpredictor/setup.py b/Processes/dlpredictor/setup.py
index 27d2291..cb3f5d1 100644
--- a/Processes/dlpredictor/setup.py
+++ b/Processes/dlpredictor/setup.py
@@ -1,19 +1,3 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
- 
-#  http://www.apache.org/licenses/LICENSE-2.0.html
-
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
 from setuptools import setup, find_packages
 
 with open("README.md", "r") as fh:

Reply via email to