This is an automated email from the ASF dual-hosted git repository.

xunh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-bluemarlin.git


The following commit(s) were added to refs/heads/main by this push:
     new e9316a4  Update lookalike application
     new 371e4b2  Merge pull request #13 from radibnia77/main
e9316a4 is described below

commit e9316a40cac096e6de44730e1c49acd448e899c0
Author: Reza <reza_a...@yahoo.com>
AuthorDate: Thu Sep 16 15:09:50 2021 -0700

    Update lookalike application
    
    These are changes for lookalike application
    1. User consolidation for fast processing
    2. Add Integration tests
    3. Refactoring
---
 .../application/pipeline/dags => }/README.md       |   0
 Model/lookalike-model/doc/ssd-v2.pdf               | Bin 406365 -> 0 bytes
 Model/lookalike-model/doc/ssd-v3.pdf               | Bin 0 -> 431277 bytes
 .../{pipeline/dags/README.md => __init__.py}       |   0
 .../application/legacy_files/distance_table.py     |  95 -------
 .../legacy_files/distance_table_list.py            | 144 -----------
 .../application/pipeline/__init__.py               |  15 ++
 .../application/pipeline/config.yml                |  34 ++-
 .../lookalike_model/application/pipeline/run.sh    |  14 +-
 .../application/pipeline/score_generator.py        |  21 +-
 ...vector_rebucketing.py => score_matrix_table.py} |  47 ++--
 .../application/pipeline/score_vector_table.py     |  17 +-
 .../application/pipeline/seed_user_selector.py     |  64 -----
 .../pipeline/top_n_similarity_table_generator.py   | 111 ++++----
 .../lookalike_model/application/rest_client.py     |  89 -------
 .../validation_plan_0.py}                          |  10 +-
 .../application/validations/validation_plan_1.py   |  94 +++++++
 Model/lookalike-model/lookalike_model/config.yml   |   6 +-
 .../lookalike_model/pipeline/main_clean.py         |  16 +-
 .../lookalike_model/pipeline/main_keywords.py      | 113 +++++++++
 .../lookalike_model/pipeline/main_trainready.py    |  59 +----
 Model/lookalike-model/lookalike_model/run.sh       |   7 +
 .../pipeline/dags => tests/application}/README.md  |   0
 .../pipeline/config_score_matrix_table.yml         |  38 +++
 .../pipeline/config_top_n_similarity.yml           |  38 +++
 .../pipeline/test_score_matrix_table.py            | 224 ++++++++++++++++
 .../test_top_n_similarity_table_generator_1.py     | 183 +++++++++++++
 .../test_top_n_similarity_table_generator_2.py     | 282 +++++++++++++++++++++
 .../pipeline/dags => tests/pipeline}/README.md     |   0
 .../tests/pipeline/config_clean.yml                |   2 +
 .../tests/pipeline/config_keywords.yml             |  22 ++
 .../tests/pipeline/data_generator.py               |  87 +++++--
 .../tests/pipeline/test_main_clean.py              |  24 +-
 .../tests/pipeline/test_main_keywords.py           |  88 +++++++
 Model/lookalike-model/tests/run_test.sh            |  23 +-
 35 files changed, 1376 insertions(+), 591 deletions(-)

diff --git 
a/Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md 
b/Model/lookalike-model/README.md
similarity index 100%
copy from 
Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md
copy to Model/lookalike-model/README.md
diff --git a/Model/lookalike-model/doc/ssd-v2.pdf 
b/Model/lookalike-model/doc/ssd-v2.pdf
deleted file mode 100644
index 0c0502d..0000000
Binary files a/Model/lookalike-model/doc/ssd-v2.pdf and /dev/null differ
diff --git a/Model/lookalike-model/doc/ssd-v3.pdf 
b/Model/lookalike-model/doc/ssd-v3.pdf
new file mode 100644
index 0000000..3729198
Binary files /dev/null and b/Model/lookalike-model/doc/ssd-v3.pdf differ
diff --git 
a/Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md 
b/Model/lookalike-model/lookalike_model/application/__init__.py
similarity index 100%
copy from 
Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md
copy to Model/lookalike-model/lookalike_model/application/__init__.py
diff --git 
a/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table.py
 
b/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table.py
deleted file mode 100644
index e82c018..0000000
--- 
a/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table.py
+++ /dev/null
@@ -1,95 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-
-#  http://www.apache.org/licenses/LICENSE-2.0.html
-
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-import yaml
-import argparse
-from pyspark import SparkContext
-from pyspark.sql import HiveContext
-from pyspark.sql.functions import lit, col, udf
-from pyspark.sql.types import FloatType, StringType, StructType, StructField, 
ArrayType, MapType
-# from rest_client import predict, str_to_intlist
-import requests
-import json
-import argparse
-from pyspark.sql.functions import udf
-from math import sqrt
-import time
-
-
-
-
-def distance(l1):
-    def _distance(l2):
-        dist = sum([l1[el]*l2[el] for el,value in l1.items()])
-        return dist
-    return _distance
-
-def x(l1):
-    _udf_distance = udf(distance(l1), FloatType() )
-    return _udf_distance
-
-def run(hive_context, cfg):
-    
-    # input tables
-    keywords_table = cfg["input"]["keywords_table"]
-    seeduser_table = cfg["input"]["seeduser_table"]
-    lookalike_loaded_table_norm = cfg['output']['gucdocs_loaded_table_norm']
-
-    # output dataframes
-    lookalike_score_table = cfg["output"]["score_table"]
-
-    command = "SELECT * FROM {}"
-    df = hive_context.sql(command.format(lookalike_loaded_table_norm))
-    df_keywords = hive_context.sql(command.format(keywords_table))
-    df_seed_user = hive_context.sql(command.format(seeduser_table))
-
-
-    #### creating a tuple of did and kws for seed users
-    df_seed_user = df_seed_user.join(df.select('did','kws_norm'), on=['did'], 
how='left')
-    # df_seed_user = df_seed_user.withColumn("seed_user_list", zip_("did", 
"kws"))
-    seed_user_list = df_seed_user.select('did','kws_norm').collect()
-    # seed_user list = [(did1, {k1:0, k2:0.2, ...}), (did2, )]
-    # user =
-    c = 0
-    temp_list = []
-    for item in seed_user_list:
-
-        c+= 1
-        if c > 850 :
-            break
-        df = df.withColumn(item[0],x(item[1])(col('kws_norm')))
-
-    df.write.option("header", "true").option(
-        "encoding", 
"UTF-8").mode("overwrite").format('hive').saveAsTable(lookalike_score_table)
-
-
-
-if __name__ == "__main__":
-    start = time.time()
-    parser = argparse.ArgumentParser(description=" ")
-    parser.add_argument('config_file')
-    args = parser.parse_args()
-    with open(args.config_file, 'r') as yml_file:
-        cfg = yaml.safe_load(yml_file)
-
-    sc = SparkContext.getOrCreate()
-    sc.setLogLevel('WARN')
-    hive_context = HiveContext(sc)
-
-    run(hive_context=hive_context, cfg=cfg)
-    sc.stop()
-    end = time.time()
-    print('Runtime of the program is:', (end - start))
diff --git 
a/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table_list.py
 
b/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table_list.py
deleted file mode 100644
index 2217994..0000000
--- 
a/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table_list.py
+++ /dev/null
@@ -1,144 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-
-#  http://www.apache.org/licenses/LICENSE-2.0.html
-
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-import yaml
-from pyspark import SparkContext
-from pyspark.sql import HiveContext
-from pyspark.sql.functions import lit, col, udf, array, mean
-from pyspark.sql.types import FloatType, StringType, StructType, StructField, 
ArrayType, MapType
-import argparse
-from pyspark.sql.functions import udf
-import time
-import math
-
-'''
-spark-submit --executor-memory 16G --driver-memory 24G  --num-executors 16 
--executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g 
distance_table_list.py config.yml
-'''
-
-
-def euclidean(l1):
-    def _euclidean(l2):
-        list = []
-        for item in l1:
-            similarity = 1 - (math.sqrt(sum([(item[i]-l2[i]) ** 2 for i in 
range(len(item))]))/math.sqrt(len(item)))
-            list.append(similarity)
-        return list
-    return _euclidean
-
-
-def dot(l1):
-    def _dot(l2):
-        list = []
-        for item in l1:
-            similarity = sum([item[i]*l2[i] for i in range(len(item))])
-            list.append(similarity)
-        return list
-    return _dot
-
-
-def ux(l1):
-    if alg == "euclidean":
-        _udf_similarity = udf(euclidean(l1), ArrayType(FloatType()))
-    if alg =="dot":
-        _udf_similarity = udf(dot(l1), ArrayType(FloatType()))
-    return _udf_similarity
-
-
-
-def l(d):
-    s = [value for key, value in d.items()]
-    return s
-udf_tolist = udf(l, ArrayType(FloatType()))
-
-def top_n(l):
-    #### top 10
-    n = 10
-    l.sort()
-    return l[-n:]
-udf_top_n = udf(top_n, ArrayType(FloatType()))
-
-def _top_n(l1, l2):
-    n = 10
-    l = sorted(l1+l2)
-    return l[-n:]
-
-_udf_top_n = udf(_top_n, ArrayType(FloatType()))
-
-def _mean(l):
-    ave = sum(l)/len(l)
-    return ave
-udf_mean = udf(_mean, FloatType())
-
-def run(hive_context, cfg):
-
-    ## load dataframes
-    lookalike_score_table_norm = cfg['output']['score_norm_table']
-    keywords_table = cfg["input"]["keywords_table"]
-    seeduser_table = cfg["input"]["seeduser_table"]
-    lookalike_similarity_table = cfg["output"]["similarity_table"]
-
-    command = "SELECT * FROM {}"
-    df = hive_context.sql(command.format(lookalike_score_table_norm))
-    df_keywords = hive_context.sql(command.format(keywords_table))
-    df_seed_user = hive_context.sql(command.format(seeduser_table))
-
-
-    #### creating a tuple of did and kws for seed users
-    if alg == "dot":
-        df = df.withColumn('kws_norm_list', udf_tolist(col('kws_norm')))
-    if alg == "euclidean":
-        df = df.withColumn('kws_norm_list', udf_tolist(col('kws')))
-    df_seed_user = df_seed_user.join(df.select('did','kws_norm_list'), 
on=['did'], how='left')
-    seed_user_list = df_seed_user.select('did', 'kws_norm_list').collect()
-
-## batch 1 : 0-100 801 seed
-    batch_length = 800
-    c = 0
-    #### i=0, c=0 , batched_user=[0,200], top_10
-    total_c = len(seed_user_list)
-    df = df.withColumn('top_10', array(lit(0.0)))
-    while total_c > 0 :
-            len_tobe_p = min(batch_length,total_c)
-            total_c-= len_tobe_p
-            batched_user = [item[1] for item in seed_user_list[c: 
c+len_tobe_p]]
-            df = 
df.withColumn("similarity_list",ux(batched_user)(col('kws_norm_list')))
-            df = df.withColumn("top_10", 
_udf_top_n(col("similarity_list"),col("top_10")))
-            c+=len_tobe_p
-
-    df = df.withColumn("mean_score",udf_mean(col("top_10")))
-    df.write.option("header", "true").option(
-        "encoding", 
"UTF-8").mode("overwrite").format('hive').saveAsTable(lookalike_similarity_table)
-    extended_did = df.sort(col('mean_score').desc()).select('did', 
'mean_score')
-
-
-
-if __name__ == "__main__":
-    start = time.time()
-    parser = argparse.ArgumentParser(description=" ")
-    parser.add_argument('config_file')
-    args = parser.parse_args()
-    with open(args.config_file, 'r') as yml_file:
-        cfg = yaml.safe_load(yml_file)
-
-    sc = SparkContext.getOrCreate()
-    sc.setLogLevel('WARN')
-    hive_context = HiveContext(sc)
-
-    ## select similarity algorithm
-    alg = cfg["input"]["alg"]
-    run(hive_context=hive_context, cfg=cfg)
-    sc.stop()
-    end = time.time()
-    print('Runtime of the program is:', (end - start))
diff --git 
a/Model/lookalike-model/lookalike_model/application/pipeline/__init__.py 
b/Model/lookalike-model/lookalike_model/application/pipeline/__init__.py
new file mode 100644
index 0000000..3e46a55
--- /dev/null
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/__init__.py
@@ -0,0 +1,15 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+ 
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
\ No newline at end of file
diff --git 
a/Model/lookalike-model/lookalike_model/application/pipeline/config.yml 
b/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
index cd941f6..485f7be 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/config.yml
@@ -1,35 +1,31 @@
-product_tag: 'lookalike'
-pipeline_tag: '08042021'
+product_tag: 'lookalike_application'
+pipeline_tag: '08172021_1000'
 score_generator:
     input:
-        log_table : "lookalike_03042021_logs"
-        did_table: "lookalike_03042021_trainready"
+        log_table : "lookalike_08172021_1000_logs"
+        did_table: "lookalike_08172021_1000_trainready"
         keywords_table: "din_ad_keywords_09172020"
-        test_table: "lookalike_trainready_jimmy_test"
+        significant_keywords_table: "lookalike_08172021_1000_keywords"
         din_model_tf_serving_url: 
"http://10.193.217.105:8506/v1/models/lookalike:predict";
         din_model_length: 20
-        seeduser_table : "lookalike_seeduser"
-        number_of_seeduser: 1000
         extend: 2000
         alg: "euclidean" ##### currently just support "euclideand" and "dot"
     output:
-        did_score_table: "{product_tag}_score_{pipeline_tag}"
-        score_norm_table:  "{product_tag}_score_norm_{pipeline_tag}"
-    normalize: False
-        
+        score_table:  "{product_tag}_{pipeline_tag}_score"
+    normalize: False       
 score_vector:
     keywords_table: "din_ad_keywords_09172020"
-    score_norm_table:  "{product_tag}_score_norm_{pipeline_tag}"
-    score_vector_table: "{product_tag}_score_vector_{pipeline_tag}"
+    score_table:  "{product_tag}_{pipeline_tag}_score"
+    score_vector_table: "{product_tag}_{pipeline_tag}_score_vector"
     did_bucket_size: 2
     did_bucket_step: 2
-score_vector_rebucketing:
+score_matrix_table:
     did_bucket_size: 2
     did_bucket_step: 2
-    alpha_did_bucket_size: 20 #default=1000
-    score_vector_alpha_table: '{product_tag}_score_vector_alpha_{pipeline_tag}'
+    score_matrix_table: '{product_tag}_{pipeline_tag}_score_matrix'
 top_n_similarity:
-    did_bucket_step: 1
-    alpha_did_bucket_step: 10
+    did_bucket_size: 100
+    did_bucket_step: 100
+    cross_bucket_size: 1 # in production this should be as same as 
did_bucket_size
     top_n: 10
-    similarity_table: "{product_tag}_similarity_{pipeline_tag}"
\ No newline at end of file
+    similarity_table: "{product_tag}_{pipeline_tag}_similarity"
\ No newline at end of file
diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/run.sh 
b/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
index d5bf875..a141ebc 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
+++ b/Model/lookalike-model/lookalike_model/application/pipeline/run.sh
@@ -1,16 +1,10 @@
 #!/bin/bash
 
-# Not used as part of pipeline
-spark-submit --master yarn --num-executors 20 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict seed_user_selector.py 
config.yml "29"
+spark-submit --master yarn --num-executors 10 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_generator.py 
config.yml
 
-spark-submit --master yarn --num-executors 20 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_generator.py 
config.yml
+spark-submit --master yarn --num-executors 10 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py 
config.yml
 
-spark-submit --master yarn --num-executors 20 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py 
config.yml
+spark-submit --master yarn --num-executors 10 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_matrix_table.py 
config.yml
 
-spark-submit --master yarn --num-executors 20 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict 
score_vector_rebucketing.py config.yml
-
-spark-submit --master yarn --num-executors 20 --executor-cores 5 
--executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict 
top_n_similarity_table_generator.py config.yml
-
-# Not used as part of pipeline
-spark-submit ---master yarn --num-executors 20 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict validation.py 
config.yml "29"
+spark-submit --master yarn --num-executors 10 --executor-cores 5 
--executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict 
top_n_similarity_table_generator.py config.yml
 
diff --git 
a/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py 
b/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
index 2dbebda..6dd86b2 100644
--- 
a/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
+++ 
b/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py
@@ -29,7 +29,7 @@ from util import resolve_placeholder
 from lookalike_model.pipeline.util import write_to_table, 
write_to_table_with_partition
 
 '''
-This process generates the score-norm-table with the following format.
+This process generates the score-table with the following format.
 
 DataFrame[age: int, gender: int, did: string, did_index: bigint, 
 interval_starting_time: array<string>, interval_keywords: array<string>, 
@@ -55,7 +55,7 @@ def str_to_intlist(table):
     return ji
 
 
-def inputData(record, keyword, length):
+def input_data(record, keyword, length):
     if len(record['show_counts']) >= length:
         hist = flatten(record['show_counts'][:length])
         instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j': 
keyword, 'sl': len(hist)}
@@ -69,7 +69,7 @@ def inputData(record, keyword, length):
 def predict(serving_url, record, length, new_keyword):
     body = {'instances': []}
     for keyword in new_keyword:
-        instance = inputData(record, keyword, length)
+        instance = input_data(record, keyword, length)
         body['instances'].append(instance)
     body_json = json.dumps(body)
     result = requests.post(serving_url, data=body_json).json()
@@ -174,18 +174,21 @@ if __name__ == '__main__':
     hive_context = HiveContext(sc)
 
     # load dataframes
-    did_table, keywords_table, din_tf_serving_url, length = 
cfg['score_generator']['input']['did_table'], cfg['score_generator']['input'][
-        'keywords_table'], 
cfg['score_generator']['input']['din_model_tf_serving_url'], 
cfg['score_generator']['input']['din_model_length']
+    did_table, keywords_table, significant_keywords_table, din_tf_serving_url, 
length = cfg['score_generator']['input']['did_table'], 
cfg['score_generator']['input'][
+        'keywords_table'], cfg['score_generator']['input'][
+        'significant_keywords_table'], 
cfg['score_generator']['input']['din_model_tf_serving_url'], 
cfg['score_generator']['input']['din_model_length']
 
     command = 'SELECT * FROM {}'
     df_did = hive_context.sql(command.format(did_table))
-    df_keywords = hive_context.sql(command.format(keywords_table))
+
+    command = 'SELECT T1.keyword,T1.spread_app_id,T1.keyword_index FROM {} AS 
T1 JOIN {} AS T2 ON T1.keyword=T2.keyword'
+    df_keywords = hive_context.sql(command.format(keywords_table, 
significant_keywords_table))
     # temporary adding to filter based on active keywords
     df_keywords = df_keywords.filter((df_keywords.keyword == 'video') | 
(df_keywords.keyword == 'shopping') | (df_keywords.keyword == 'info') |
                                      (df_keywords.keyword == 'social') | 
(df_keywords.keyword == 'reading') | (df_keywords.keyword == 'travel') |
                                      (df_keywords.keyword == 'entertainment'))
-    did_loaded_table = cfg['score_generator']['output']['did_score_table']
-    score_norm_table = cfg['score_generator']['output']['score_norm_table']
+
+    score_table = cfg['score_generator']['output']['score_table']
 
     # create a CTR score generator instance and run to get the loaded did
     ctr_score_generator = CTRScoreGenerator(df_did, df_keywords, 
din_tf_serving_url, length)
@@ -198,4 +201,4 @@ if __name__ == '__main__':
         df = df.withColumn('kws_norm', udf_normalize(col('kws')))
 
     # save the loaded did to hive table
-    write_to_table_with_partition(df, score_norm_table, 
partition=('did_bucket'), mode='overwrite')
+    write_to_table_with_partition(df, score_table, partition=('did_bucket'), 
mode='overwrite')
diff --git 
a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
 
b/Model/lookalike-model/lookalike_model/application/pipeline/score_matrix_table.py
similarity index 62%
rename from 
Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
rename to 
Model/lookalike-model/lookalike_model/application/pipeline/score_matrix_table.py
index 7cae65c..ee82c06 100644
--- 
a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py
+++ 
b/Model/lookalike-model/lookalike_model/application/pipeline/score_matrix_table.py
@@ -17,8 +17,8 @@
 import yaml
 import argparse
 from pyspark import SparkContext
-from pyspark.sql import HiveContext
-from pyspark.sql.functions import lit, col, udf
+from pyspark.sql import HiveContext, SparkSession, Window
+from pyspark.sql.functions import lit, col, udf, collect_list
 from pyspark.sql.types import FloatType, StringType, StructType, StructField, 
ArrayType, MapType, IntegerType
 # from rest_client import predict, str_to_intlist
 import requests
@@ -36,41 +36,41 @@ from lookalike_model.pipeline.util import write_to_table, 
write_to_table_with_pa
 '''
 
 To run, execute the following in application folder.
-spark-submit --master yarn --num-executors 20 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict 
score_vector_rebucketing.py config.yml
+spark-submit --master yarn --num-executors 20 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_matirx_table.py 
config.yml
 
-This process generates added secondary buckects ids (alpha-did-bucket).
+This process consolidates bucket score vectors into matrices.
 
 '''
 
 
-def assign_new_bucket_id(df, n, new_column_name):
-    def __hash_sha256(s):
-        hex_value = hashlib.sha256(s.encode('utf-8')).hexdigest()
-        return int(hex_value, 16)
-    _udf = udf(lambda x: __hash_sha256(x) % n, IntegerType())
-    df = df.withColumn(new_column_name, _udf(df.did))
-    return df
-
-
-def run(hive_context, cfg):
+def run(spark_session, hive_context, cfg):
 
     score_vector_table = cfg['score_vector']['score_vector_table']
-    bucket_size = cfg['score_vector_rebucketing']['did_bucket_size']
-    bucket_step = cfg['score_vector_rebucketing']['did_bucket_step']
-    alpha_bucket_size = 
cfg['score_vector_rebucketing']['alpha_did_bucket_size']
-    score_vector_alpha_table = 
cfg['score_vector_rebucketing']['score_vector_alpha_table']
+    bucket_size = cfg['score_matrix_table']['did_bucket_size']
+    bucket_step = cfg['score_matrix_table']['did_bucket_step']
+    score_matrix_table = cfg['score_matrix_table']['score_matrix_table']
 
     first_round = True
+    num_batches = (bucket_size + bucket_step - 1) / bucket_step
+    batch_num = 1
     for did_bucket in range(0, bucket_size, bucket_step):
-        command = "SELECT did, did_bucket, score_vector, c1 FROM {} WHERE 
did_bucket BETWEEN {} AND {}".format(score_vector_table, did_bucket, 
did_bucket+bucket_step-1)
+        print('Processing batch {} of {}   bucket number: 
{}'.format(batch_num, num_batches, did_bucket))
+
+        max_bucket = min(did_bucket+bucket_step-1, bucket_size)
+        command = "SELECT did, did_bucket, score_vector, c1 FROM {} WHERE 
did_bucket BETWEEN {} AND {}".format(score_vector_table, did_bucket, max_bucket)
+        # command = "SELECT did_bucket, collect_list(struct(did, score_vector, 
c1)) AS item FROM {} WHERE did_bucket BETWEEN {} AND {} GROUP BY 
did_bucket".format(score_vector_table, did_bucket, 
min(did_bucket+bucket_step-1, bucket_size))
 
         df = hive_context.sql(command)
-        df = assign_new_bucket_id(df, alpha_bucket_size, 'alpha_did_bucket')
+        df = df.groupBy('did_bucket').agg(
+            collect_list('did').alias('did_list'),
+            collect_list('score_vector').alias('score_matrix'),
+            collect_list('c1').alias('c1_list'))
 
         mode = 'overwrite' if first_round else 'append'
-        write_to_table_with_partition(df.select('did', 'score_vector', 'c1', 
'did_bucket', 'alpha_did_bucket'),
-                                      score_vector_alpha_table, 
partition=('did_bucket', 'alpha_did_bucket'), mode=mode)
+        write_to_table_with_partition(df.select('did_list', 'score_matrix', 
'c1_list', 'did_bucket'),
+                                      score_matrix_table, 
partition=('did_bucket'), mode=mode)
         first_round = False
+        batch_num += 1
 
 
 if __name__ == "__main__":
@@ -84,8 +84,9 @@ if __name__ == "__main__":
     sc = SparkContext.getOrCreate()
     sc.setLogLevel('WARN')
     hive_context = HiveContext(sc)
+    spark_session = SparkSession(sc)
 
-    run(hive_context=hive_context, cfg=cfg)
+    run(spark_session, hive_context=hive_context, cfg=cfg)
     sc.stop()
     end = time.time()
     print('Runtime of the program is:', (end - start))
diff --git 
a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
 
b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
index 5c13fc8..aa8ee51 100644
--- 
a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
+++ 
b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py
@@ -36,7 +36,7 @@ from util import resolve_placeholder
 '''
 
 To run, execute the following in application folder.
-spark-submit --master yarn --num-executors 20 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py 
config.yml
+spark-submit --master yarn --num-executors 10 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py 
config.yml
 
 This process generates the score_vector_table table.
 
@@ -50,10 +50,11 @@ The top-n-similarity table is
 
 '''
 
+
 def run(hive_context, cfg):
 
     keywords_table = cfg["score_vector"]["keywords_table"]
-    score_norm_table = cfg['score_vector']['score_norm_table']
+    score_table = cfg['score_vector']['score_table']
     score_vector_table = cfg['score_vector']['score_vector_table']
     bucket_size = cfg['score_vector']['did_bucket_size']
     bucket_step = cfg['score_vector']['did_bucket_step']
@@ -65,8 +66,12 @@ def run(hive_context, cfg):
 
     # add score-vector iterativly
     first_round = True
+    num_batches = (bucket_size + bucket_step - 1) / bucket_step
+    batch_num = 1
     for did_bucket in range(0, bucket_size, bucket_step):
-        command = "SELECT did, did_bucket, kws FROM {} WHERE did_bucket 
BETWEEN {} AND {}".format(score_norm_table, did_bucket, 
did_bucket+bucket_step-1)
+        print('Processing batch {} of {}   bucket number: 
{}'.format(batch_num, num_batches, did_bucket))
+
+        command = "SELECT did, did_bucket, kws FROM {} WHERE did_bucket 
BETWEEN {} AND {}".format(score_table, did_bucket, 
min(did_bucket+bucket_step-1, bucket_size))
 
         # |0004f3b4731abafa9ac54d04cb88782ed61d30531262decd799d91beb6d6246a|0  
       |
         # [social -> 0.24231663, entertainment -> 0.20828941, reading -> 
0.44120282, video -> 0.34497723, travel -> 0.3453492, shopping -> 0.5347804, 
info -> 0.1978679]|
@@ -75,10 +80,12 @@ def run(hive_context, cfg):
                            udf(lambda kws: [kws[keyword] if keyword in kws 
else 0.0 for keyword in keywords], ArrayType(FloatType()))(df.kws))
 
         df = df.withColumn('c1', udf(lambda x: 
float(np.array(x).dot(np.array(x))), FloatType())(df.score_vector))
-       
+
         mode = 'overwrite' if first_round else 'append'
         write_to_table_with_partition(df.select('did', 'score_vector', 'c1', 
'did_bucket'), score_vector_table, partition=('did_bucket'), mode=mode)
+        # write_to_table_with_partition(df.select('did', 'did_bucket'), 
score_vector_table, partition=('did_bucket'), mode=mode)
         first_round = False
+        batch_num += 1
 
 
 if __name__ == "__main__":
@@ -90,7 +97,7 @@ if __name__ == "__main__":
         cfg = yaml.safe_load(yml_file)
         resolve_placeholder(cfg)
     sc = SparkContext.getOrCreate()
-    sc.setLogLevel('WARN')
+    sc.setLogLevel('INFO')
     hive_context = HiveContext(sc)
 
     run(hive_context=hive_context, cfg=cfg)
diff --git 
a/Model/lookalike-model/lookalike_model/application/pipeline/seed_user_selector.py
 
b/Model/lookalike-model/lookalike_model/application/pipeline/seed_user_selector.py
deleted file mode 100644
index 691d588..0000000
--- 
a/Model/lookalike-model/lookalike_model/application/pipeline/seed_user_selector.py
+++ /dev/null
@@ -1,64 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-
-#  http://www.apache.org/licenses/LICENSE-2.0.html
-
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-from pyspark import SparkContext
-from pyspark.sql import HiveContext
-import yaml
-import argparse
-from util import resolve_placeholder
-
-
-'''
-input: logfile
-output: lookalike_seeduser table
-
-
-'''
-
-
-def run(hive_context, cfg, kwi):
-    seed_user_table = cfg['input']['seeduser_table']
-    log_table = cfg['input']['log_table']
-    number_of_seeduser = cfg['input']['number_of_seeduser']
-
-    # command = "select * from (select * from {} where is_click=1 and 
keyword_index=29) as s join (select * from {} where is_click=1 and 
keyword_index=26) as b on b.did = s.did where s.gender = 1"
-    command = "SELECT * FROM {} WHERE is_click=1 AND keyword_index={}"
-    df = hive_context.sql(command.format(log_table, kwi))
-    user_list = 
df.select('did').alias('did').distinct().limit(number_of_seeduser)
-    user_list.cache()
-
-    user_list.write.option("header", "true").option(
-        "encoding", 
"UTF-8").mode("overwrite").format('hive').saveAsTable(seed_user_table)
-
-
-if __name__ == "__main__":
-    """
-    select seed users
-    """
-    parser = argparse.ArgumentParser(description=" ")
-    parser.add_argument('config_file')
-    parser.add_argument('kwi')
-    args = parser.parse_args()
-    kwi = args.kwi
-    with open(args.config_file, 'r') as yml_file:
-        cfg = yaml.safe_load(yml_file)
-        resolve_placeholder(cfg)
-    sc = SparkContext.getOrCreate()
-    sc.setLogLevel('WARN')
-    hive_context = HiveContext(sc)
-
-    run(hive_context=hive_context, cfg=cfg, kwi=kwi)
-    sc.stop()
diff --git 
a/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py
 
b/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py
index 66363c5..60071b6 100644
--- 
a/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py
+++ 
b/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py
@@ -19,22 +19,16 @@ import argparse
 import pyspark.sql.functions as fn
 
 from pyspark import SparkContext
-from pyspark.sql import HiveContext
-from pyspark.sql.types import FloatType, StringType, StructType, StructField, 
ArrayType, MapType, StructType
+from pyspark.sql import HiveContext, SparkSession
+from pyspark.sql.types import FloatType, StringType, StructType, StructField, 
ArrayType, MapType, IntegerType
+from pyspark.sql.functions import udf, col, explode
 
-# from rest_client import predict, str_to_intlist
-import requests
-import json
-import argparse
-from pyspark.sql.functions import udf
 from math import sqrt
 import time
 import numpy as np
 import itertools
 import heapq
 from util import resolve_placeholder
-
-
 from lookalike_model.pipeline.util import write_to_table, 
write_to_table_with_partition
 
 '''
@@ -53,65 +47,87 @@ The top-n-similarity table is
 '''
 
 
-def run(sc, hive_context, cfg):
+def run(spark_session, hive_context, cfg):
 
-    score_vector_alpha_table = 
cfg['score_vector_rebucketing']['score_vector_alpha_table']
+    score_matrix_table = cfg['score_matrix_table']['score_matrix_table']
     similarity_table = cfg['top_n_similarity']['similarity_table']
-    N = cfg['top_n_similarity']['top_n']
-
-    did_bucket_size = cfg['score_vector_rebucketing']['did_bucket_size']
+    did_bucket_size = cfg['top_n_similarity']['did_bucket_size']
     did_bucket_step = cfg['top_n_similarity']['did_bucket_step']
-
-    alpha_bucket_size = 
cfg['score_vector_rebucketing']['alpha_did_bucket_size']
-    alpha_bucket_step = cfg['top_n_similarity']['alpha_did_bucket_step']
+    cross_bucket_size = cfg['top_n_similarity']['cross_bucket_size']
+    top_n_value = cfg['top_n_similarity']['top_n']
 
     first_round = True
+    num_batches = (did_bucket_size + did_bucket_step - 1) / did_bucket_step
+    batch_num = 1
     for did_bucket in range(0, did_bucket_size, did_bucket_step):
+        print('Processing batch {} of {}   bucket number: 
{}'.format(batch_num, num_batches, did_bucket))
 
-        command = "SELECT did, did_bucket, score_vector, c1 FROM {} WHERE 
did_bucket BETWEEN {} AND {}".format(
-            score_vector_alpha_table, did_bucket, did_bucket + did_bucket_step 
- 1)
+        command = "SELECT did_list, did_bucket, score_matrix, c1_list FROM {} 
WHERE did_bucket BETWEEN {} AND {}".format(
+            score_matrix_table, did_bucket, min(did_bucket + did_bucket_step - 
1, did_bucket_size))
         # |0004f3b4731abafa9ac54d04cb88782ed61d30531262decd799d91beb6d6246a|0  
       |
         # [0.24231663, 0.20828941, 0.0]|
         df = hive_context.sql(command)
         df = df.withColumn('top_n_similar_user', fn.array())
 
-        for alpha_bucket in range(0, alpha_bucket_size, alpha_bucket_step):
-            command = """SELECT did, score_vector, c1, alpha_did_bucket 
-            FROM {} WHERE alpha_did_bucket BETWEEN {} AND {}"""
-            command = command.format(score_vector_alpha_table,
-                                     alpha_bucket, alpha_bucket + 
alpha_bucket_step - 1)
+        for cross_bucket in range(0, cross_bucket_size):
+            print('Processing batch {}, alpha bucket {}'.format(batch_num, 
cross_bucket))
+
+            command = """SELECT did_list, score_matrix, c1_list, did_bucket
+            FROM {} WHERE did_bucket = {} """
+            command = command.format(score_matrix_table, cross_bucket)
 
             df_user = hive_context.sql(command)
-            block_user = df_user.select('did', 'score_vector', 'c1').collect()
-            block_user_did_score = ([_['did'] for _ in block_user], 
[_['score_vector'] for _ in block_user])
-            block_user_broadcast = sc.broadcast(block_user_did_score)
-
-            c2 = np.array([_['c1'] for _ in block_user])
-            c2 = np.square(np.linalg.norm(c2)).tolist()
-            c2_broadcast = sc.broadcast(c2)
-
-            def calculate_similarity(user_score_vector, top_n_user_score, c1):
-                m = len(user_score_vector)
-                user_score_vector = np.array(user_score_vector)
-                dids, other_score_vectors = block_user_broadcast.value
-                other_score_vectors = np.array(other_score_vectors)
-                cross_mat = np.matmul(user_score_vector, 
other_score_vectors.transpose())
-                c2 = np.array(c2_broadcast.value)
-                similarity = np.sqrt(m) - np.sqrt(c1 + c2 - 2 * cross_mat)
-                user_score_s = list(itertools.izip(dids, similarity.tolist()))
-                user_score_s.extend(top_n_user_score)
-                user_score_s = heapq.nlargest(N, user_score_s, key=lambda x: 
x[1])
-                return user_score_s
+            cross_users = df_user.select('did_list', 'score_matrix', 
'c1_list').collect()
+
+            if len(cross_users) == 0:
+                continue
+
+            cross_users_did_score = (cross_users[0]['did_list'], 
cross_users[0]['score_matrix'])
+            c2 = np.array(cross_users[0]['c1_list'])
+
+            def calculate_similarity(cross_users_did_score, c2):
+                def __helper(user_score_matrix, top_n_user_score, c1_list):
+                    user_score_matrix = np.array(user_score_matrix)
+                    m = user_score_matrix.shape[1]
+                    cross_dids, cross_score_matrix = cross_users_did_score
+                    cross_score_matrix = np.array(cross_score_matrix)
+                    cross_mat = np.matmul(user_score_matrix, 
cross_score_matrix.transpose())
+
+                    similarity = np.sqrt(m) - 
np.sqrt(np.maximum(np.expand_dims(c1_list, 1) + c2 - (2 * cross_mat), 0.0))
+                    result = []
+                    for cosimilarity, top_n in 
itertools.izip_longest(similarity, top_n_user_score, fillvalue=[]):
+                        user_score_s = list(itertools.izip(cross_dids, 
cosimilarity.tolist()))
+                        user_score_s.extend(top_n)
+                        user_score_s = heapq.nlargest(top_n_value, 
user_score_s, key=lambda x: x[1])
+                        result.append(user_score_s)
+                    return result
+                return __helper
 
             elements_type = StructType([StructField('did', StringType(), 
False), StructField('score', FloatType(), False)])
 
             # update top_n_similar_user field
-            df = df.withColumn('top_n_similar_user', udf(calculate_similarity, 
ArrayType(elements_type))(df.score_vector, df.top_n_similar_user, df.c1))
+            df = df.withColumn('top_n_similar_user', 
udf(calculate_similarity(cross_users_did_score, c2),
+                                                         
ArrayType(ArrayType(elements_type)))(df.score_matrix, df.top_n_similar_user, 
df.c1_list))
+
+        # Unpack the matrices into individual users.
+        # Note: in Spark 2.4, the udf can be replaced with arrays_zip().
+        def combine(x, y):
+            return list(zip(x, y))
+        df = df.withColumn("new", udf(combine, 
ArrayType(StructType([StructField("did", StringType()),
+                                                                     
StructField("top_n_similar_user", ArrayType(StructType([
+                                                                         
StructField("did", StringType(), True),
+                                                                         
StructField("score", FloatType(), True), ]), True)),
+                                                                     
])))(df.did_list, df.top_n_similar_user))
+        df = df.withColumn("new", explode("new"))
+        df = df.select(col("new.did").alias("did"),
+                       
col("new.top_n_similar_user").alias("top_n_similar_user"),
+                       "did_bucket")
 
         mode = 'overwrite' if first_round else 'append'
         # use the partitioned field at the end of the select. Order matters.
-        write_to_table_with_partition(df.select('did', 'top_n_similar_user', 
'did_bucket'), similarity_table, partition=('did_bucket'), mode=mode)
+        write_to_table_with_partition(df, similarity_table, 
partition=('did_bucket'), mode=mode)
         first_round = False
+        batch_num += 1
 
 
 if __name__ == "__main__":
@@ -126,8 +142,9 @@ if __name__ == "__main__":
     sc = SparkContext.getOrCreate()
     sc.setLogLevel('INFO')
     hive_context = HiveContext(sc)
+    spark_session = SparkSession(sc)
 
-    run(sc=sc, hive_context=hive_context, cfg=cfg)
+    run(spark_session, hive_context, cfg)
     sc.stop()
     end = time.time()
     print('Runtime of the program is:', (end - start))
diff --git a/Model/lookalike-model/lookalike_model/application/rest_client.py 
b/Model/lookalike-model/lookalike_model/application/rest_client.py
deleted file mode 100644
index c3b7a02..0000000
--- a/Model/lookalike-model/lookalike_model/application/rest_client.py
+++ /dev/null
@@ -1,89 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-
-#  http://www.apache.org/licenses/LICENSE-2.0.html
-
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-"""
-THE script gets the data, process it and send the request to
-the rest client and print out the response from the the rest API
-"""
-import requests
-import json
-import argparse
-import yaml
-
-
-def flatten(lst):
-    f = [y for x in lst for y in x]
-    return f
-
-
-def str_to_intlist(table):
-    ji = []
-    for k in [table[j].split(",") for j in range(len(table))]:
-        s = []
-        for a in k:
-            b = int(a.split(":")[0])
-            s.append(b)
-        ji.append(s)
-    return ji
-
-
-def inputData(record, keyword, length):
-    if len(record['show_counts']) >= length:
-        hist = flatten(record['show_counts'][:length])
-        instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j': 
keyword, 'sl': len(hist)}
-    else:
-        hist = flatten(record['show_counts'])
-        # [hist.extend([0]) for i in range(length - len(hist))]
-        instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j': 
keyword, 'sl': len(hist)}
-    return instance
-
-
-def predict(serving_url, record, length, new_keyword):
-    body = {"instances": []}
-    for keyword in new_keyword:
-        instance = inputData(record, keyword, length)
-        body['instances'].append(instance)
-    body_json = json.dumps(body)
-    result = requests.post(serving_url, data=body_json).json()
-    if 'error' in result.keys():
-        predictions = result['error']
-    else:
-        predictions = result['predictions']
-    return predictions
-
-
-def run(cfg):
-    length = cfg['input']['din_model_length']
-    url = cfg['input']['din_model_tf_serving_url']
-    ##time_interval, did, click_counts, show_counts, media_category, 
net_type_index, gender, age, keyword
-    record = {"did": 0, "show_counts": ['25:3', '29:6,25:2', '29:1,25:2,14:2', 
'14:1,29:2,25:2',
-                                          '29:1', '26:1,14:2,25:4', 
'14:1,25:3'], "show_clicks": [], "age": '10', "gender": '3'}
-    record['show_counts'] = str_to_intlist(record['show_counts'])
-    new_keyword = [26, 27, 29]
-    response = predict(serving_url=url, record=record, length=length, 
new_keyword=new_keyword)
-
-    print(response)
-
-
-if __name__ == '__main__':  # record is equal to window size
-    parser = argparse.ArgumentParser(description='Prepare data')
-    parser.add_argument('config_file')
-    args = parser.parse_args()
-
-    with open(args.config_file, 'r') as ymlfile:
-        cfg = yaml.safe_load(ymlfile)
-
-    run(cfg)
diff --git 
a/Model/lookalike-model/lookalike_model/application/pipeline/validation.py 
b/Model/lookalike-model/lookalike_model/application/validations/validation_plan_0.py
similarity index 87%
rename from 
Model/lookalike-model/lookalike_model/application/pipeline/validation.py
rename to 
Model/lookalike-model/lookalike_model/application/validations/validation_plan_0.py
index 5409472..08f1f5a 100644
--- a/Model/lookalike-model/lookalike_model/application/pipeline/validation.py
+++ 
b/Model/lookalike-model/lookalike_model/application/validations/validation_plan_0.py
@@ -20,6 +20,10 @@ from pyspark import SparkContext
 import argparse, yaml
 from util import resolve_placeholder
 
+'''
+spark-submit --master yarn --num-executors 10 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict validation.py 
config.yml "29"
+'''
+
 
 def counting(click,kwi):
     count_click = 0
@@ -31,10 +35,10 @@ def counting(click,kwi):
 
 def run(hive_context,cfg, kwi):
     lookalike_score_table = cfg["output"]["similarity_table"]
-    seed_user_table = cfg['input']['seeduser_table']
+    seed_user_table = 'lookalike_seeduser'
     extend = cfg['input']['extend']
-    test_table = cfg['input']['test_table']
-    number_of_seeduser = cfg['input']['number_of_seeduser']
+    test_table = 'lookalike_trainready_jimmy_test'
+    number_of_seeduser = 1000
 
     ######### filtering the df and removing seed users
     command = "select * from {}"
diff --git 
a/Model/lookalike-model/lookalike_model/application/validations/validation_plan_1.py
 
b/Model/lookalike-model/lookalike_model/application/validations/validation_plan_1.py
new file mode 100644
index 0000000..e92ea97
--- /dev/null
+++ 
b/Model/lookalike-model/lookalike_model/application/validations/validation_plan_1.py
@@ -0,0 +1,94 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from pyspark.sql.functions import lit, col, udf, rand
+from pyspark.sql import HiveContext
+from pyspark import SparkContext
+import argparse
+import yaml
+from util import resolve_placeholder
+
+'''
+
+spark-submit --master yarn --num-executors 10 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict validation_plan_1.py 
config.yml '29'
+
+1. Randomly select 1000 users. It is user_list.
+2. Calculate click/imp for kw. It is actual_interest.
+3. Sort user_list based on actual_interest and get first 500. This is list M.
+4. Sort user_list based on kw score and get first 500. It is list N.
+5. Final result is (size of common(M,N) ) / (size of M)
+
+
+'''
+
+
+def process(user, kwi):
+    l = user['kwi_show_counts']
+    imp_for_kwi = sum([int(j.split(':')[1]) if j.split(':')[0] == kwi else 0 
for _ in l for j in _.split(',')])
+
+    l = user['kwi_click_counts']
+    click_for_kwi = sum([int(j.split(':')[1]) if j.split(':')[0] == kwi else 0 
for _ in l for j in _.split(',')])
+
+    actual_interest = click_for_kwi * 1.0 / imp_for_kwi if imp_for_kwi != 0 
else 0
+
+    kwi_score = 0
+    if kwi in user['kws']:
+        kwi_score = user['kws']['kwi']
+
+    return (user['did'], actual_interest, kwi_score)
+
+
+def run(hive_context, cfg, kwi):
+    RANDOM_USERS_SIZE = 1000
+    lookalike_score_table = cfg['score_generator']['output']['score_table']
+
+    # Randomly select 1000 users. It is user_list.
+    command = "SELECT * FROM {}"
+    df = hive_context.sql(command.format(lookalike_score_table))
+    user_list = df.orderBy(rand()).limit(RANDOM_USERS_SIZE).collect()
+    user_metrics = [process(user, kwi) for user in user_list]
+
+    n = sorted(user_metrics, key=lambda x: x[1], 
reverse=True)[:RANDOM_USERS_SIZE//2]
+    m = sorted(user_metrics, key=lambda x: x[2], 
reverse=True)[:RANDOM_USERS_SIZE//2]
+
+    n = set([_[0] for _ in n])
+    m = set([_[0] for _ in m])
+
+    size_of_common = len(n.intersection(m))
+    result = size_of_common*1.0/(RANDOM_USERS_SIZE//2)
+
+    print(result)
+
+
+if __name__ == "__main__":
+    """
+    validate the result
+    """
+    parser = argparse.ArgumentParser(description=" ")
+    parser.add_argument('config_file')
+    parser.add_argument('kwi')
+    args = parser.parse_args()
+    kwi = args.kwi
+    with open(args.config_file, 'r') as yml_file:
+        cfg = yaml.safe_load(yml_file)
+        resolve_placeholder(cfg)
+
+    sc = SparkContext.getOrCreate()
+    sc.setLogLevel('WARN')
+    hive_context = HiveContext(sc)
+
+    run(hive_context=hive_context, cfg=cfg, kwi=kwi)
+    sc.stop()
diff --git a/Model/lookalike-model/lookalike_model/config.yml 
b/Model/lookalike-model/lookalike_model/config.yml
index dbfdf08..fe05afc 100644
--- a/Model/lookalike-model/lookalike_model/config.yml
+++ b/Model/lookalike-model/lookalike_model/config.yml
@@ -99,6 +99,9 @@ features:
               'task_id',
               'pps_inside_exprmt_ab_tag']
 pipeline:
+  main_keywords:
+    keyword_output_table: '{product_tag}_{pipeline_tag}_keywords'
+    keyword_threshold: 0.01  # Portion of showlog traffic that a keyword must 
reach to be included.
   main_clean:  
     did_bucket_num: 2 # Number of partitions for did
     load_logs_in_minutes: 14400 #1440/day, original=14400
@@ -161,9 +164,6 @@ pipeline:
     logs_output_table_name: '{product_tag}_{pipeline_tag}_logs'
   main_trainready:
     trainready_output_table: '{product_tag}_{pipeline_tag}_trainready'
-    show_threshold_low: 0.5     # per day (set to -1 for no low threshold)
-    show_threshold_high: -1     # per day (set to -1 for no high threshold)
-    active_interval_threshold: 0.2   # proportion of days  (set to -1 for no 
active day filter)
   tfrecords:
     tfrecords_statistics_path: 
'{product_tag}_{pipeline_tag}_tfrecord_statistics.pkl'
     tfrecords_hdfs_path: '{product_tag}_{pipeline_tag}_tfrecord' # it is hdfs 
location for tfrecords, over-writes the existing files
diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_clean.py 
b/Model/lookalike-model/lookalike_model/pipeline/main_clean.py
index 9efa3c6..ffa38e9 100644
--- a/Model/lookalike-model/lookalike_model/pipeline/main_clean.py
+++ b/Model/lookalike-model/lookalike_model/pipeline/main_clean.py
@@ -20,7 +20,7 @@ from datetime import datetime, timedelta
 import timeit
 
 from pyspark import SparkContext
-from pyspark.sql.functions import col, udf
+from pyspark.sql.functions import col, udf, collect_set
 from pyspark.sql.types import BooleanType, IntegerType, StringType
 from pyspark.sql import HiveContext
 from util import load_config, load_batch_config, load_df
@@ -90,6 +90,12 @@ def clean_batched_log(df, df_persona, conditions, 
df_keywords, did_bucket_num):
     df = add_did_bucket(df, did_bucket_num)
     return df
 
+def filter_keywords(df, keywords):
+    # User defined function to return if the keyword is in the inclusion set.
+    _udf = udf(lambda x: x in keywords, BooleanType())
+
+    # Return the filtered dataframe.
+    return df.filter(_udf(col('keyword')))
 
 def clean_logs(cfg, df_persona, df_keywords, log_table_names):
     sc = SparkContext.getOrCreate()
@@ -185,6 +191,8 @@ def run(hive_context, cfg):
 
     did_bucket_num = cfg_clean['did_bucket_num']
 
+    keywords_effective_table = 
cfg['pipeline']['main_keywords']['keyword_output_table']
+
     command = """SELECT did, 
                         gender_new_dev AS gender, 
                         forecast_age_dev AS age 
@@ -201,6 +209,12 @@ def run(hive_context, cfg):
         df_keywords = load_df(hive_context, keywords_table)
     #[Row(keyword=u'education', keyword_index=1, spread_app_id=u'C100203741')]
 
+    # Use the effective keyword table to filter the keyword table which
+    # will serve to filter the show and click log tables.
+    df_effective_keywords = load_df(hive_context, keywords_effective_table)
+    effective_keywords = 
df_effective_keywords.select(collect_set('keyword')).first()[0]
+    df_keywords = filter_keywords(df_keywords, effective_keywords)
+
     log_table_names = (showlog_table, showlog_new_table, clicklog_table, 
clicklog_new_table)
 
     clean_logs(cfg, df_persona, df_keywords, log_table_names)
diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_keywords.py 
b/Model/lookalike-model/lookalike_model/pipeline/main_keywords.py
new file mode 100644
index 0000000..e09e16b
--- /dev/null
+++ b/Model/lookalike-model/lookalike_model/pipeline/main_keywords.py
@@ -0,0 +1,113 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+ 
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from datetime import datetime, timedelta
+from pyspark import SparkContext
+
+from util import load_config, load_batch_config, print_batching_info
+from util import write_to_table, generate_add_keywords, resolve_placeholder
+
+
+def run(hive_context, showlog_table, keywords_mapping_table, 
create_keywords_mapping, 
+    start_date, end_date, load_minutes, keyword_threshold, 
effective_keywords_table):
+    """
+    # This script goes through the showlog and identifies all the 
+    # keywords that comprise a portion of the overall traffic greater
+    # than the specified threshold.
+    """
+
+    # Create ad keywords table if does not exist.
+    if create_keywords_mapping:
+        generate_add_keywords(keywords_mapping_table)
+    #[Row(keyword=u'education', keyword_index=1, spread_app_id=u'C100203741')]
+
+    starting_time = datetime.strptime(start_date, "%Y-%m-%d")
+    ending_time = datetime.strptime(end_date, "%Y-%m-%d")
+
+    # In batches, get the show counts for all of the keywords.
+    keyword_totals = {}
+    batched_round = 1
+    while starting_time < ending_time:
+        time_start = starting_time.strftime("%Y-%m-%d %H:%M:%S")
+        batch_time_end = starting_time + timedelta(minutes=load_minutes)
+        batch_time_end = min(batch_time_end, ending_time)
+        time_end = batch_time_end.strftime("%Y-%m-%d %H:%M:%S")
+        print_batching_info("Main keywords", batched_round, time_start, 
time_end)
+
+        # Get the impressions for the time window joined with the keywords.
+        command = """SELECT 
+                    logs.spread_app_id, 
+                    logs.show_time,
+                    kw.keyword 
+                    FROM {log_table} as logs inner join {keyword_table} as kw 
on logs.spread_app_id = kw.spread_app_id
+                    WHERE logs.show_time >= '{time_start}' AND show_time < 
'{time_end}' """ 
+        df_showlog_batched = 
hive_context.sql(command.format(log_table=showlog_table, 
+            keyword_table=keywords_mapping_table, time_start=time_start, 
time_end=time_end))
+
+        # Get the number of impressions for each keyword.
+        df = df_showlog_batched.groupby('keyword').count().collect()
+
+        # Add the impression count for each keyword to the dictionary.
+        for row in df:
+            keyword_totals[row['keyword']] = 
keyword_totals.get(row['keyword'], 0) + int(row['count'])
+        starting_time = batch_time_end
+        batched_round += 1
+
+    # With the total keyword counts calculated, identify the keywords that meet
+    # the threshold to be included.
+    # Get the total and calculate the count threshold for effective keywords.
+    total_impressions = sum(keyword_totals.values())
+    impression_threshold = keyword_threshold * total_impressions
+
+    # For each keyword, if its count is greater than the threshold, add
+    # it to the effective keyword list.
+    effective_keywords = []
+    for key, value in keyword_totals.items():
+        if value > impression_threshold:
+            effective_keywords.append((key,))  # Append as a tuple
+
+    # Create the dataframe with the results and save to Hive.
+    sc = SparkContext.getOrCreate()
+    df_effective_keywords = 
sc.parallelize(effective_keywords).toDF(['keyword'])
+    write_to_table(df_effective_keywords, effective_keywords_table)
+
+
+if __name__ == "__main__":
+
+    """
+    main_keywords is a process to identify the effective keywords that 
+    comprise a percentage of the traffic above a given threshold.
+    """
+    sc, hive_context, cfg = load_config(
+        description="clean data of persona, clicklog and showlog.")
+    resolve_placeholder(cfg)
+
+    cfg_clean = cfg['pipeline']['main_clean']
+    showlog_table = cfg['showlog_table_name']
+    keywords_mapping_table = cfg['keywords_table']
+    create_keywords_mapping = cfg_clean['create_keywords']
+
+    cfg_keywords = cfg['pipeline']['main_keywords']
+    keyword_threshold = cfg_keywords['keyword_threshold']
+    effective_keywords_table = cfg_keywords['keyword_output_table']
+
+    start_date, end_date, load_minutes = load_batch_config(cfg)
+
+    run(hive_context, showlog_table, keywords_mapping_table, 
create_keywords_mapping, 
+        start_date, end_date, load_minutes, keyword_threshold, 
effective_keywords_table)
+
+    sc.stop()
+
diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py 
b/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py
index cf8218f..0fc3247 100644
--- a/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py
+++ b/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py
@@ -23,7 +23,7 @@ from pyspark import SparkContext
 from pyspark.sql import functions as fn
 from pyspark.sql.functions import lit, col, udf, collect_list, concat_ws, 
first, create_map, monotonically_increasing_id, row_number
 from pyspark.sql.window import Window
-from pyspark.sql.types import FloatType, IntegerType, ArrayType, StringType, 
LongType
+from pyspark.sql.types import IntegerType, ArrayType, StringType, LongType
 from pyspark.sql import HiveContext
 from datetime import datetime, timedelta
 from util import write_to_table, write_to_table_with_partition, 
print_batching_info, resolve_placeholder, load_config, load_batch_config, 
load_df
@@ -36,54 +36,10 @@ def date_to_timestamp(dt):
     epoch = datetime.utcfromtimestamp(0)
     return int((dt - epoch).total_seconds())
 
-def filter_users(df, show_threshold_low, show_threshold_high, 
active_interval_threshold, total_num_intervals):
-    def list_map_count(x):
-        value_total = 0
-        for cell in x:
-            key_values = cell.split(',')
-            for key_value in key_values:
-                print('list_map_count: {}'.format(key_value))
-                _, value = key_value.split(':')
-                value_total += int(value)
-        return float(value_total)/total_num_intervals
-
-    # Returns the ratio of items in the list that have non-zero values.
-    # def list_map_nonzero_count(x):
-    def list_map_nonzero_ratio(x):
-        active_intervals = 0
-        for cell in x:
-            key_values = cell.split(',')
-            for key_value in key_values:
-                print('list_map_nonzero_ratio: {}'.format(key_value))
-                _, value = key_value.split(':')
-                if int(value) != 0:
-                    active_intervals += 1
-                    break
-        return float(active_intervals)/total_num_intervals
-
-    # Aggregate user activity by impressions.
-    df = df.withColumn('total_show_count', udf(list_map_count, 
FloatType())(col('kwi_show_counts')))
-
-    # Filter out users below set activity level.
-    if show_threshold_low >= 0:
-        df = df.filter(df.total_show_count > show_threshold_low)
-    if show_threshold_high >= 0:
-        df = df.filter(df.total_show_count < show_threshold_high)
-
-    # Calculate the number of active intervals.
-    df = df.withColumn('show_active_interval_ratio', 
udf(list_map_nonzero_ratio, FloatType())(col('kwi_show_counts')))
-
-    # Filter out users below set number of active intervals.
-    if (active_interval_threshold >= 0):
-        df = df.filter(df.show_active_interval_ratio > 
active_interval_threshold)
-
-    return df
-
 
 def generate_trainready(hive_context, batch_config,
                         interval_time_in_seconds,
-                        logs_table_name, trainready_table, did_bucket_num, 
-                        show_threshold_low, show_threshold_high, 
active_interval_threshold):
+                        logs_table_name, trainready_table, did_bucket_num):
 
     def group_batched_logs(df_logs):
         # group logs from did + interval_time + keyword.
@@ -240,9 +196,6 @@ def generate_trainready(hive_context, batch_config,
         for i, feature_name in enumerate(['interval_starting_time', 
'interval_keywords', 'kwi', 'kwi_show_counts', 'kwi_click_counts']):
             df = df.withColumn(feature_name, col('metrics_list').getItem(i))
 
-        # Filter the users by activity level.
-        df = filter_users(df, show_threshold_low, show_threshold_high, 
active_interval_threshold, len(all_intervals))
-
         # Add did_index
         w = Window.orderBy("did_bucket", "did")
         df = df.withColumn('row_number', row_number().over(w))
@@ -265,17 +218,11 @@ def run(hive_context, cfg):
 
     cfg_train = cfg['pipeline']['main_trainready']
     trainready_table = cfg_train['trainready_output_table']
-    show_threshold_low = cfg_train['show_threshold_low']
-    show_threshold_high = cfg_train['show_threshold_high']
-    active_interval_threshold = cfg_train['active_interval_threshold']
-
     did_bucket_num = cfg_clean['did_bucket_num']
 
     batch_config = load_batch_config(cfg)
 
-    generate_trainready(hive_context, batch_config, interval_time_in_seconds, 
-        logs_table_name, trainready_table, did_bucket_num, 
-        show_threshold_low, show_threshold_high, active_interval_threshold)
+    generate_trainready(hive_context, batch_config, interval_time_in_seconds, 
logs_table_name, trainready_table, did_bucket_num)
 
 
 if __name__ == "__main__":
diff --git a/Model/lookalike-model/lookalike_model/run.sh 
b/Model/lookalike-model/lookalike_model/run.sh
index b24adab..201fa0a 100644
--- a/Model/lookalike-model/lookalike_model/run.sh
+++ b/Model/lookalike-model/lookalike_model/run.sh
@@ -1,5 +1,12 @@
 #!/bin/bash
 
+# main_keywords: identify the keywords with proportion of traffic above set 
threshold.
+if false
+then
+    # generate the effective keywords table. 
+    spark-submit --master yarn --num-executors 20 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict 
pipeline/main_keywords.py config.yml
+fi
+
 # main_clean: preparing cleaned persona, click and show logs data.
 if false
 then
diff --git 
a/Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md 
b/Model/lookalike-model/tests/application/README.md
similarity index 100%
copy from 
Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md
copy to Model/lookalike-model/tests/application/README.md
diff --git 
a/Model/lookalike-model/tests/application/pipeline/config_score_matrix_table.yml
 
b/Model/lookalike-model/tests/application/pipeline/config_score_matrix_table.yml
new file mode 100644
index 0000000..eec98fc
--- /dev/null
+++ 
b/Model/lookalike-model/tests/application/pipeline/config_score_matrix_table.yml
@@ -0,0 +1,38 @@
+product_tag: 'lookalike_application'
+pipeline_tag: 'unittest_score_matrix_table'
+score_generator:
+    input:
+        log_table : "lookalike_03042021_logs"
+        did_table: "lookalike_03042021_trainready"
+        keywords_table: "din_ad_keywords_09172020"
+        din_model_tf_serving_url: 
"http://10.193.217.105:8506/v1/models/lookalike:predict";
+        din_model_length: 20
+        extend: 2000
+        alg: "euclidean" ##### currently just support "euclideand" and "dot"
+    output:
+        score_table:  "{product_tag}_score_{pipeline_tag}"
+    normalize: False
+        
+score_vector:
+    keywords_table: "din_ad_keywords_09172020"
+    score_table:  "{product_tag}_{pipeline_tag}_score"
+    score_vector_table: '{product_tag}_{pipeline_tag}_input_score_vector'
+    did_bucket_size: 2
+    did_bucket_step: 2
+score_vector_rebucketing:
+    did_bucket_size: 2
+    did_bucket_step: 1
+    alpha_did_bucket_size: 4
+    score_vector_alpha_table: 
'{product_tag}_{pipeline_tag}_input_score_vector_alpha'
+score_matrix_table:
+    did_bucket_size: 1
+    did_bucket_step: 1
+    score_matrix_table: '{product_tag}_{pipeline_tag}_output_score_matrix'
+top_n_similarity:
+    did_bucket_size: 1
+    did_bucket_step: 1
+    cross_bucket_size: 1
+    cross_bucket_step: 1
+    alpha_did_bucket_step: 1
+    top_n: 10
+    similarity_table: "{product_tag}_{pipeline_tag}_output_similarity"
\ No newline at end of file
diff --git 
a/Model/lookalike-model/tests/application/pipeline/config_top_n_similarity.yml 
b/Model/lookalike-model/tests/application/pipeline/config_top_n_similarity.yml
new file mode 100644
index 0000000..d18d6e9
--- /dev/null
+++ 
b/Model/lookalike-model/tests/application/pipeline/config_top_n_similarity.yml
@@ -0,0 +1,38 @@
+product_tag: 'lookalike_application'
+pipeline_tag: 'unittest_top_n_similarity'
+score_generator:
+    input:
+        log_table : "lookalike_03042021_logs"
+        did_table: "lookalike_03042021_trainready"
+        keywords_table: "din_ad_keywords_09172020"
+        din_model_tf_serving_url: 
"http://10.193.217.105:8506/v1/models/lookalike:predict";
+        din_model_length: 20
+        extend: 2000
+        alg: "euclidean" ##### currently just support "euclideand" and "dot"
+    output:
+        score_table:  "{product_tag}_score_{pipeline_tag}"
+    normalize: False
+        
+score_vector:
+    keywords_table: "din_ad_keywords_09172020"
+    score_table:  "{product_tag}_{pipeline_tag}_score"
+    score_vector_table: "{product_tag}_{pipeline_tag}_score_vector"
+    did_bucket_size: 2
+    did_bucket_step: 2
+score_vector_rebucketing:
+    did_bucket_size: 2
+    did_bucket_step: 1
+    alpha_did_bucket_size: 4
+    score_vector_alpha_table: 
'{product_tag}_{pipeline_tag}_input_score_vector_alpha'
+score_matrix_table:
+    did_bucket_size: 2
+    did_bucket_step: 1
+    score_matrix_table: '{product_tag}_{pipeline_tag}_input_score_matrix'
+top_n_similarity:
+    did_bucket_size: 1
+    did_bucket_step: 1
+    cross_bucket_size: 1
+    cross_bucket_step: 1
+    alpha_did_bucket_step: 1
+    top_n: 10
+    similarity_table: "{product_tag}_{pipeline_tag}_output_similarity"
\ No newline at end of file
diff --git 
a/Model/lookalike-model/tests/application/pipeline/test_score_matrix_table.py 
b/Model/lookalike-model/tests/application/pipeline/test_score_matrix_table.py
new file mode 100644
index 0000000..035c994
--- /dev/null
+++ 
b/Model/lookalike-model/tests/application/pipeline/test_score_matrix_table.py
@@ -0,0 +1,224 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+ 
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import unittest
+import yaml
+from pyspark import SparkContext
+from pyspark.sql import SparkSession, HiveContext
+from pyspark.sql.types import DataType, StringType, StructField, StructType, 
FloatType, IntegerType, ArrayType, MapType
+from lookalike_model.application.pipeline import util, score_matrix_table
+
+
+class TestTopNSimilarityTableGenerator(unittest.TestCase):
+
+    def setUp (self):
+        # Set the log level.
+        self.sc = SparkContext.getOrCreate()
+        self.sc.setLogLevel('ERROR')
+
+        # Initialize the Spark session
+        self.spark = SparkSession.builder.appName('unit 
test').enableHiveSupport().getOrCreate()
+        self.hive_context = HiveContext(self.sc)
+
+    def test_run(self):
+        print('*** Running TestTopNSimilarityTableGenerator.test_run ***')
+
+        # Load the test configuration.
+        with open('application/pipeline/config_score_matrix_table.yml', 'r') 
as ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+        util.resolve_placeholder(cfg)
+
+        # Get the names of the input and output tables.
+        vector_table = cfg['score_vector']['score_vector_table']
+        matrix_table = cfg['score_matrix_table']['score_matrix_table']
+
+        # Create the input table.
+        self.create_vector_table(vector_table, True)
+
+        # Run the function being tested.
+        score_matrix_table.run(self.spark, self.hive_context, cfg)
+
+        # Load the output of the function.
+        command = """select * from {} order by 
did_bucket""".format(matrix_table)
+        df = self.hive_context.sql(command)
+        df.show()
+
+        command = """select * from {} order by 
did_bucket""".format(vector_table)
+        df_vector = self.hive_context.sql(command)
+
+        # Validate the output.
+        self.validate_similarity_table(df, df_vector, True)
+
+    def test_run2(self):
+        print('*** Running TestTopNSimilarityTableGenerator.test_run2 ***')
+
+        # Load the test configuration.
+        with open('application/pipeline/config_score_matrix_table.yml', 'r') 
as ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+        util.resolve_placeholder(cfg)
+
+        # Get the names of the input and output tables.
+        vector_table = cfg['score_vector']['score_vector_table']
+        matrix_table = cfg['score_matrix_table']['score_matrix_table']
+        
+        cfg['score_matrix_table']['did_bucket_size'] = 2
+
+        # Create the input table.
+        self.create_vector_table(vector_table, False)
+
+        # Run the function being tested.
+        score_matrix_table.run(self.spark, self.hive_context, cfg)
+
+        # Load the output of the function.
+        command = """select * from {} order by 
did_bucket""".format(matrix_table)
+        df = self.hive_context.sql(command)
+        df.show()
+
+        command = """select * from {} order by 
did_bucket""".format(vector_table)
+        df_vector = self.hive_context.sql(command)
+
+        # Validate the output.
+        self.validate_similarity_table(df, df_vector, False)
+
+    def test_run3(self):
+        print('*** Running TestTopNSimilarityTableGenerator.test_run3 ***')
+
+        # Load the test configuration.
+        with open('application/pipeline/config_score_matrix_table.yml', 'r') 
as ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+        util.resolve_placeholder(cfg)
+
+        # Get the names of the input and output tables.
+        vector_table = cfg['score_vector']['score_vector_table']
+        matrix_table = cfg['score_matrix_table']['score_matrix_table']
+        
+        cfg['score_matrix_table']['did_bucket_size'] = 2
+        cfg['score_matrix_table']['did_bucket_step'] = 2
+
+        # Create the input table.
+        self.create_vector_table(vector_table, False)
+
+        # Run the function being tested.
+        score_matrix_table.run(self.spark, self.hive_context, cfg)
+
+        # Load the output of the function.
+        command = """select * from {} order by 
did_bucket""".format(matrix_table)
+        df = self.hive_context.sql(command)
+        df.show()
+
+        command = """select * from {} order by 
did_bucket""".format(vector_table)
+        df_vector = self.hive_context.sql(command)
+
+        # Validate the output.
+        self.validate_similarity_table(df, df_vector, False)
+
+    def create_vector_table (self, table_name, same_bucket=False):
+        later_bucket = 1
+        if same_bucket:
+            later_bucket = 0
+
+        data = [
+            ('0000001', [0.1, 0.8, 0.9], 1.46, 0),
+            ('0000002', [0.1, 0.1, 0.1], 0.03, 0),
+            ('0000003', [0.1, 0.8, 0.9], 1.46, later_bucket),
+            ('0000004', [0.1, 0.2, 0.3], 0.14, later_bucket),
+        ]
+
+        schema = StructType([
+            StructField("did", StringType(), True),
+            StructField("score_vector", ArrayType(FloatType(), True), True),
+            StructField("c1", FloatType(), True),
+            StructField("did_bucket", IntegerType(), True)
+        ])
+
+        df = 
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+        util.write_to_table(df, table_name)
+
+    def create_matrix(self):
+        data = [
+            (['0000001', '0000002', '0000003', '0000004'], 
+            [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1], [0.1, 0.8, 0.9], [0.1, 0.2, 
0.3]], 
+            [1.46, 0.03, 1.46, 0.14], 
+            0)
+        ]
+
+        schema = StructType([
+            StructField("did_list", ArrayType(StringType(), True)),
+            StructField("score_matrix", ArrayType(ArrayType(FloatType(), 
True)), True),
+            StructField("c1_list", ArrayType(FloatType(), True)),
+            StructField("did_bucket", IntegerType(), True)
+        ])
+
+        df = 
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+        return df
+
+    def create_matrix2(self):
+        data = [
+            (['0000001', '0000002'], [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1]], 
[1.46, 0.03], 0),
+            (['0000003', '0000004'], [[0.1, 0.8, 0.9], [0.1, 0.2, 0.3]], 
[1.46, 0.14], 1)
+        ]
+
+        schema = StructType([
+            StructField("did_list", ArrayType(StringType(), True)),
+            StructField("score_matrix", ArrayType(ArrayType(FloatType(), 
True)), True),
+            StructField("c1_list", ArrayType(FloatType(), True)),
+            StructField("did_bucket", IntegerType(), True)
+        ])
+
+        df = 
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+        return df
+
+    def validate_similarity_table (self, df, df_vector, same_bucket=False):
+        if same_bucket:
+            df_ref = self.create_matrix()
+        else:
+            df_ref = self.create_matrix2()
+
+        # Verify the column names.
+        self.assertEqual(len(df.columns), len(df_ref.columns))
+        for name in df.columns:
+            self.assertIn(name, df_ref.columns)
+
+        # Verify the number of rows.
+        self.assertEqual(df.count(), df_ref.count())
+
+        # Index the score vector by did_bucket and did for quick reference.
+        vectors = {}
+        for row in df_vector.collect():
+            did = row['did']
+            score_vector = row['score_vector']
+            c1 = row['c1']
+            did_bucket = row['did_bucket']
+            if did_bucket not in vectors:
+                vectors[did_bucket] = {did: (score_vector, c1)}
+            else:
+                vectors[did_bucket][did] = (score_vector, c1)
+
+        # Check the row values.
+        for row in df.collect():
+            did_bucket = row['did_bucket']
+            self.assertIn(did_bucket, vectors)
+            did_map = vectors[did_bucket]
+            for did, vector, c1 in zip(row['did_list'], row['score_matrix'], 
row['c1_list']):
+                self.assertIn(did, did_map)
+                self.assertEqual(vector, did_map[did][0])
+                self.assertEqual(c1, did_map[did][1])
+
+
+# Runs the tests.
+if __name__ == '__main__':
+    # Run the unit tests.
+    unittest.main()
\ No newline at end of file
diff --git 
a/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_1.py
 
b/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_1.py
new file mode 100644
index 0000000..ddecac8
--- /dev/null
+++ 
b/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_1.py
@@ -0,0 +1,183 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import unittest
+import yaml
+from pyspark import SparkContext
+from pyspark.sql import HiveContext, Row, SparkSession
+from pyspark.sql.functions import col, udf, collect_set
+from pyspark.sql.types import IntegerType, BooleanType, StructType, 
StructField, StringType, StructType, ArrayType, FloatType
+from lookalike_model.pipeline import main_clean, util
+from lookalike_model.pipeline.util import write_to_table
+from lookalike_model.application.pipeline import 
top_n_similarity_table_generator
+import random
+import string
+
+'''
+spark-submit --master yarn --num-executors 2 --executor-cores 5 
--executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g 
--conf spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict 
test_top_n_similarity_table_generator_1.py
+'''
+
+
+def random_string_generator(str_size):
+    PREFIX = 'lookalike_application_unittest_'
+    return PREFIX+''.join(random.choice(string.ascii_letters) for _ in 
range(str_size))
+
+
+class TestMainClean(unittest.TestCase):
+
+    def setUp(self):
+        # Set the log level.
+        self.sc = SparkContext.getOrCreate()
+        self.sc.setLogLevel('ERROR')
+
+        # Initialize the Spark session
+        self.spark = SparkSession.builder.appName('unit 
test').enableHiveSupport().getOrCreate()
+        self.hive_context = HiveContext(self.sc)
+
+    def drop_table(self, table_name):
+        self.hive_context.sql('DROP TABLE {}'.format(table_name))
+
+    def create_matrix_table(self, data, table_name):
+        schema = StructType([
+            StructField("did_list", ArrayType(StringType(), False)),
+            StructField("score_matrix", ArrayType(ArrayType(FloatType(), 
False)), False),
+            StructField("c1_list", ArrayType(FloatType(), False)),
+            StructField("did_bucket", IntegerType(), False)
+        ])
+
+        df = 
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+        util.write_to_table(df, table_name)
+
+    def run_top_n_similarity_table_generator(self, cfg, _input):
+        self.create_matrix_table(_input, 
cfg['score_matrix_table']['score_matrix_table'])
+        top_n_similarity_table_generator.run(self.sc, self.hive_context, cfg)
+        result = self.hive_context.sql('SELECT 
did,top_n_similar_user,did_bucket FROM 
{}'.format(cfg['top_n_similarity']['similarity_table']))
+        return result
+
+    def compare_output_and_expected_and_cleanup(self, cfg, test_name, 
df_output, _expected_output):
+        elements_type = StructType([StructField('did', StringType(), False), 
StructField('score', FloatType(), False)])
+        _schema = StructType([StructField('did', StringType(), False), 
StructField('top_n_similar_user',
+                                                                               
    ArrayType(elements_type), False), StructField('did_bucket', IntegerType(), 
False)])
+        df_expected_output = 
self.hive_context.createDataFrame(_expected_output, _schema)
+
+        df_output = df_output.sort('did')
+        df_expected_output = df_expected_output.sort('did')
+
+        print('Test name : {}'.format(test_name))
+
+        print('Expected')
+        df_expected_output.show(10, False)
+
+        print('Output')
+        df_output.show(10, False)
+
+        diff = df_output.subtract(df_expected_output)
+        print('Difference')
+        diff.show(10, False)
+        diff_count = diff.count()
+
+        # Clean up: Remove tmp tables
+        self.drop_table(cfg['score_matrix_table']['score_matrix_table'])
+        self.drop_table(cfg['top_n_similarity']['similarity_table'])
+
+        return diff_count == 0
+
+    def test_run_1(self):
+        cfg = {
+            'score_matrix_table': {
+                'did_bucket_size': 2,
+                'did_bucket_step': 2,
+                'score_matrix_table': random_string_generator(10)
+            },
+
+            'top_n_similarity': {'did_bucket_size': 2,
+                                 'did_bucket_step': 2,
+                                 'cross_bucket_size': 2,
+                                 'top_n': 10,
+                                 'similarity_table': 
random_string_generator(10)}
+        }
+
+        _input = [(['1', '2'], [[0.1, 0.8, 0.9], [0.1, 0.8, 0.9]], [1.46, 
1.46], 0), (['3', '4'], [[0.1, 0.8, 0.9], [0.1, 0.8, 0.9]], [1.46, 1.46], 1)]
+        _expected_output = [
+            ('1', [Row(did='1', score=1.7316996), Row(did='2', 
score=1.7316996), Row(did='3', score=1.7316996), Row(did='4', 
score=1.7316996)], 0),
+            ('2', [Row(did='1', score=1.7316996), Row(did='2', 
score=1.7316996), Row(did='3', score=1.7316996), Row(did='4', 
score=1.7316996)], 0),
+            ('3', [Row(did='1', score=1.7316996), Row(did='2', 
score=1.7316996), Row(did='3', score=1.7316996), Row(did='4', 
score=1.7316996)], 1),
+            ('4', [Row(did='1', score=1.7316996), Row(did='2', 
score=1.7316996), Row(did='3', score=1.7316996), Row(did='4', 
score=1.7316996)], 1),
+        ]
+
+        df_output = self.run_top_n_similarity_table_generator(cfg, _input)
+        are_equal = self.compare_output_and_expected_and_cleanup(cfg, 
'test_run_1', df_output, _expected_output)
+        self.assertTrue(are_equal)
+
+    def test_run_2(self):
+        cfg = {
+            'score_matrix_table': {
+                'did_bucket_size': 2,
+                'did_bucket_step': 2,
+                'score_matrix_table': random_string_generator(10)
+            },
+
+            'top_n_similarity': {'did_bucket_size': 2,
+                                 'did_bucket_step': 2,
+                                 'cross_bucket_size': 2,
+                                 'top_n': 10,
+                                 'similarity_table': 
random_string_generator(10)}
+        }
+
+        _input = [(['1', '2'], [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1]], [1.46, 
0.03], 0), (['3', '4'], [[0.1, 0.8, 0.9], [0.1, 0.2, 0.3]], [1.46, 0.14], 1)]
+        _expected_output = [
+            ('1', [Row(did='1', score=1.7316996), Row(did='3', 
score=1.7316996), Row(did='4', score=0.8835226), Row(did='2', 
score=0.6690362)], 0),
+            ('2', [Row(did='2', score=1.7320508), Row(did='4', 
score=1.5084441), Row(did='3', score=0.6690362), Row(did='1', 
score=0.6690362)], 0),
+            ('3', [Row(did='1', score=1.7316996), Row(did='3', 
score=1.7316996), Row(did='4', score=0.8835226), Row(did='2', 
score=0.6690362)], 1),
+            ('4', [Row(did='4', score=1.7320508), Row(did='2', 
score=1.5084441), Row(did='3', score=0.8835226), Row(did='1', 
score=0.8835226)], 1),
+        ]
+
+        df_output = self.run_top_n_similarity_table_generator(cfg, _input)
+        are_equal = self.compare_output_and_expected_and_cleanup(cfg, 
'test_run_2', df_output, _expected_output)
+        self.assertTrue(are_equal)
+
+    def test_run_3(self):
+        cfg = {
+            'score_matrix_table': {
+                'did_bucket_size': 2,
+                'did_bucket_step': 2,
+                'score_matrix_table': random_string_generator(10)
+            },
+
+            'top_n_similarity': {'did_bucket_size': 2,
+                                 'did_bucket_step': 2,
+                                 'cross_bucket_size': 2,
+                                 'top_n': 2,
+                                 'similarity_table': 
random_string_generator(10)}
+        }
+
+        _input = [(['1', '2'], [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1]], [1.46, 
0.03], 0), (['3', '4'], [[0.1, 0.8, 0.9], [0.1, 0.2, 0.3]], [1.46, 0.14], 1)]
+        _expected_output = [
+            ('1', [Row(did='1', score=1.7316996), Row(did='3', 
score=1.7316996)], 0),
+            ('2', [Row(did='2', score=1.7320508), Row(did='4', 
score=1.5084441)], 0),
+            ('3', [Row(did='1', score=1.7316996), Row(did='3', 
score=1.7316996)], 1),
+            ('4', [Row(did='4', score=1.7320508), Row(did='2', 
score=1.5084441)], 1),
+        ]
+
+        df_output = self.run_top_n_similarity_table_generator(cfg, _input)
+        are_equal = self.compare_output_and_expected_and_cleanup(cfg, 
'test_run_3', df_output, _expected_output)
+        self.assertTrue(are_equal)
+
+
+# Runs the tests.
+if __name__ == '__main__':
+    # Run the unit tests.
+    unittest.main()
diff --git 
a/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_2.py
 
b/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_2.py
new file mode 100644
index 0000000..d7a56ae
--- /dev/null
+++ 
b/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_2.py
@@ -0,0 +1,282 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+ 
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import unittest
+import yaml
+from pyspark import SparkContext
+from pyspark.sql import SparkSession, HiveContext
+from pyspark.sql.types import DataType, StringType, StructField, StructType, 
FloatType, IntegerType, ArrayType, MapType
+from lookalike_model.application.pipeline import util, 
top_n_similarity_table_generator
+
+
+class TestTopNSimilarityTableGenerator(unittest.TestCase):
+
+    def setUp (self):
+        # Set the log level.
+        self.sc = SparkContext.getOrCreate()
+        self.sc.setLogLevel('ERROR')
+
+        # Initialize the Spark session
+        self.spark = SparkSession.builder.appName('unit 
test').enableHiveSupport().getOrCreate()
+        self.hive_context = HiveContext(self.sc)
+
+    def test_run(self):
+        print('*** Running TestTopNSimilarityTableGenerator.test_run ***')
+
+        # Load the test configuration.
+        with open('application/pipeline/config_top_n_similarity.yml', 'r') as 
ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+        util.resolve_placeholder(cfg)
+
+        # Get the names of the input and output tables.
+        # alpha_table = 
cfg['score_vector_rebucketing']['score_vector_alpha_table']
+        matrix_table = cfg['score_matrix_table']['score_matrix_table']
+        top_n_table = cfg['top_n_similarity']['similarity_table']
+
+        # Create the input table.
+        # self.create_alpha_table(alpha_table)
+        self.create_matrix_table(matrix_table)
+
+        # Run the function being tested.
+        top_n_similarity_table_generator.run(self.spark, self.hive_context, 
cfg)
+
+        # Load the output of the function.
+        command = """select * from {} order by did""".format(top_n_table)
+        df = self.hive_context.sql(command)
+        df.show()
+
+        # Validate the output.
+        self.validate_similarity_table(df, True)
+
+    def test_run2(self):
+        print('*** Running TestTopNSimilarityTableGenerator.test_run2 ***')
+
+        # Load the test configuration.
+        with open('application/pipeline/config_top_n_similarity.yml', 'r') as 
ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+        util.resolve_placeholder(cfg)
+        cfg['top_n_similarity']['did_bucket_size'] = 2
+        cfg['top_n_similarity']['cross_bucket_size'] = 2
+
+        # Get the names of the input and output tables.
+        # alpha_table = 
cfg['score_vector_rebucketing']['score_vector_alpha_table']
+        matrix_table = cfg['score_matrix_table']['score_matrix_table']
+        top_n_table = cfg['top_n_similarity']['similarity_table']
+
+        # Create the input table.
+        # self.create_alpha_table(alpha_table)
+        self.create_matrix_table2(matrix_table)
+
+        # Run the function being tested.
+        top_n_similarity_table_generator.run(self.spark, self.hive_context, 
cfg)
+
+        # Load the output of the function.
+        command = """select * from {} order by did""".format(top_n_table)
+        df = self.hive_context.sql(command)
+        df.show()
+
+        # Validate the output.
+        self.validate_similarity_table(df)
+
+    def test_run3(self):
+        print('*** Running TestTopNSimilarityTableGenerator.test_run3 ***')
+
+        # Load the test configuration.
+        with open('application/pipeline/config_top_n_similarity.yml', 'r') as 
ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+        util.resolve_placeholder(cfg)
+        cfg['top_n_similarity']['did_bucket_size'] = 2
+        cfg['top_n_similarity']['did_bucket_step'] = 2
+        cfg['top_n_similarity']['cross_bucket_size'] = 2
+
+        # Get the names of the input and output tables.
+        # alpha_table = 
cfg['score_vector_rebucketing']['score_vector_alpha_table']
+        matrix_table = cfg['score_matrix_table']['score_matrix_table']
+        top_n_table = cfg['top_n_similarity']['similarity_table']
+
+        # Create the input table.
+        # self.create_alpha_table(alpha_table)
+        self.create_matrix_table2(matrix_table)
+
+        # Run the function being tested.
+        top_n_similarity_table_generator.run(self.spark, self.hive_context, 
cfg)
+
+        # Load the output of the function.
+        command = """select * from {} order by did""".format(top_n_table)
+        df = self.hive_context.sql(command)
+        df.show()
+
+        # Validate the output.
+        self.validate_similarity_table(df)
+
+    def test_run4(self):
+        print('*** Running TestTopNSimilarityTableGenerator.test_run4 ***')
+
+        # Load the test configuration.
+        with open('application/pipeline/config_top_n_similarity.yml', 'r') as 
ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+        util.resolve_placeholder(cfg)
+        cfg['top_n_similarity']['did_bucket_size'] = 2
+        cfg['top_n_similarity']['did_bucket_step'] = 2
+        cfg['top_n_similarity']['cross_bucket_size'] = 2
+        cfg['top_n_similarity']['cross_bucket_step'] = 2
+
+        # Get the names of the input and output tables.
+        # alpha_table = 
cfg['score_vector_rebucketing']['score_vector_alpha_table']
+        matrix_table = cfg['score_matrix_table']['score_matrix_table']
+        top_n_table = cfg['top_n_similarity']['similarity_table']
+
+        # Create the input table.
+        # self.create_alpha_table(alpha_table)
+        self.create_matrix_table2(matrix_table)
+
+        # Run the function being tested.
+        top_n_similarity_table_generator.run(self.spark, self.hive_context, 
cfg)
+
+        # Load the output of the function.
+        command = """select * from {} order by did""".format(top_n_table)
+        df = self.hive_context.sql(command)
+        df.show()
+
+        # Validate the output.
+        self.validate_similarity_table(df)
+
+    def create_matrix_table(self, table_name):
+        data = [
+            (['0000001', '0000002', '0000003', '0000004'], 
+            [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1], [0.1, 0.8, 0.9], [0.1, 0.2, 
0.3]], 
+            [1.46, 0.03, 1.46, 0.14], 
+            0)
+        ]
+
+        schema = StructType([
+            StructField("did_list", ArrayType(StringType(), True)),
+            StructField("score_matrix", ArrayType(ArrayType(FloatType(), 
True)), True),
+            StructField("c1_list", ArrayType(FloatType(), True)),
+            StructField("did_bucket", IntegerType(), True)
+        ])
+
+        df = 
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+        util.write_to_table(df, table_name)
+
+    def create_matrix_table2(self, table_name):
+        data = [
+            (['0000001', '0000002'], [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1]], 
[1.46, 0.03], 0),
+            (['0000003', '0000004'], [[0.1, 0.8, 0.9], [0.1, 0.2, 0.3]], 
[1.46, 0.14], 1)
+        ]
+
+        schema = StructType([
+            StructField("did_list", ArrayType(StringType(), True)),
+            StructField("score_matrix", ArrayType(ArrayType(FloatType(), 
True)), True),
+            StructField("c1_list", ArrayType(FloatType(), True)),
+            StructField("did_bucket", IntegerType(), True)
+        ])
+
+        df = 
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+        util.write_to_table(df, table_name)
+
+    def create_alpha_table (self, table_name):
+        data = [
+            ('0000001', [0.1, 0.8, 0.9], 1.46, 0, 0),
+            ('0000002', [0.1, 0.1, 0.1], 0.03, 0, 1),
+            ('0000003', [0.1, 0.8, 0.9], 1.46, 1, 2),
+            ('0000004', [0.1, 0.2, 0.3], 0.14, 1, 3),
+        ]
+
+        schema = StructType([
+            StructField("did", StringType(), True),
+            StructField("score_vector", ArrayType(FloatType(), True), True),
+            StructField("c1", FloatType(), True),
+            StructField("did_bucket", IntegerType(), True),
+            StructField("alpha_did_bucket", IntegerType(), True)
+        ])
+
+        df = 
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+        util.write_to_table(df, table_name)
+
+    def validate_similarity_table (self, df, same_bucket=False):
+        later_bucket = 1
+        if same_bucket:
+            later_bucket = 0
+        data = [
+            ('0000001', [{'did':'0000001', 'score':1.73205081}, 
{'did':'0000003', 'score':1.73205081}, {'did':'0000004', 'score':0.88532267}, 
{'did':'0000002', 'score':0.66903623}], 0),
+            ('0000002', [{'did':'0000002', 'score':1.73205081}, 
{'did':'0000004', 'score':1.50844401}, {'did':'0000001', 'score':0.66903623}, 
{'did':'0000003', 'score':0.66903623}], 0),
+            ('0000003', [{'did':'0000001', 'score':1.73205081}, 
{'did':'0000003', 'score':1.73205081}, {'did':'0000004', 'score':0.88532267}, 
{'did':'0000002', 'score':0.66903623}], later_bucket),
+            ('0000004', [{'did':'0000004', 'score':1.73205081}, 
{'did':'0000002', 'score':1.50844401}, {'did':'0000001', 'score':0.88532267}, 
{'did':'0000003', 'score':0.88532267}], later_bucket)
+        ]
+
+        # data = [
+        #     (['0000001', '0000002', '0000003', '0000004'], 
+        #     [[{'did':'0000001', 'score':1.73205081}, {'did':'0000003', 
'score':1.73205081}, {'did':'0000004', 'score':0.88532267}, {'did':'0000002', 
'score':0.66903623}], 
+        #      [{'did':'0000002', 'score':1.73205081}, {'did':'0000004', 
'score':1.50844401}, {'did':'0000001', 'score':0.66903623}, {'did':'0000003', 
'score':0.66903623}], 
+        #      [{'did':'0000001', 'score':1.73205081}, {'did':'0000003', 
'score':1.73205081}, {'did':'0000004', 'score':0.88532267}, {'did':'0000002', 
'score':0.66903623}], 
+        #      [{'did':'0000004', 'score':1.73205081}, {'did':'0000002', 
'score':1.50844401}, {'did':'0000001', 'score':0.88532267}, {'did':'0000003', 
'score':0.88532267}]], 
+        #     0)
+        # ]
+
+        schema = StructType([
+            StructField("did", StringType(), True),
+            StructField("top_n_similar_user", ArrayType(MapType(StringType(), 
StringType(), True), True)),
+            StructField("did_bucket", IntegerType(), True)
+        ])
+
+        df_ref = 
self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema)
+        # util.write_to_table(df, table_name)
+
+        # Verify the column names.
+        self.assertEqual(len(df.columns), len(df_ref.columns))
+        for name in df.columns:
+            self.assertIn(name, df_ref.columns)
+
+        # Verify the number of rows.
+        self.assertEqual(df.count(), df_ref.count())
+
+        # Check the row values.
+        for row, row_ref in zip(df.collect(), df_ref.collect()):
+            self.assertEqual(row['did'], row_ref['did'])
+            self.assertEqual(row['did_bucket'], row_ref['did_bucket'])
+            top_n = row['top_n_similar_user']
+            top_n_ref = row_ref['top_n_similar_user']
+            top_n_ordered = []
+            
+            # Convert the reference list into a list of score/[did] tuples.
+            for ref in top_n_ref:
+                if len(top_n_ordered) == 0 or float(ref['score']) != 
top_n_ordered[-1][0]:
+                    top_n_ordered.append((float(ref['score']), [ ref['did'] ]))
+                else:
+                    top_n_ordered[-1][1].append(ref['did'])
+            print(top_n_ordered)
+
+            # Verify the similarity order and values.            
+            index = 0
+            index_count = 0
+            for item in top_n:
+                self.assertAlmostEqual(item['score'], top_n_ordered[index][0], 
2)
+                self.assertIn(item['did'], top_n_ordered[index][1])
+                index_count += 1
+                if (index_count == len(top_n_ordered[index][1])):
+                    index += 1
+                    index_count = 0
+
+            
+                
+
+
+
+# Runs the tests.
+if __name__ == '__main__':
+    # Run the unit tests.
+    unittest.main()
\ No newline at end of file
diff --git 
a/Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md 
b/Model/lookalike-model/tests/pipeline/README.md
similarity index 100%
rename from 
Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md
rename to Model/lookalike-model/tests/pipeline/README.md
diff --git a/Model/lookalike-model/tests/pipeline/config_clean.yml 
b/Model/lookalike-model/tests/pipeline/config_clean.yml
index 8ae2c63..546a0b5 100644
--- a/Model/lookalike-model/tests/pipeline/config_clean.yml
+++ b/Model/lookalike-model/tests/pipeline/config_clean.yml
@@ -7,6 +7,8 @@ keywords_table: 'lookalike_unittest_clean_input_keywords'
 log:
   level: 'ERROR' # log level for spark and app
 pipeline:
+  main_keywords:
+    keyword_output_table: 'lookalike_unittest_clean_input_effective_keywords'
   main_clean:  
     did_bucket_num: 2 # Number of partitions for did
     load_logs_in_minutes: 1440 #1440/day, original=14400
diff --git a/Model/lookalike-model/tests/pipeline/config_keywords.yml 
b/Model/lookalike-model/tests/pipeline/config_keywords.yml
new file mode 100644
index 0000000..4f95b65
--- /dev/null
+++ b/Model/lookalike-model/tests/pipeline/config_keywords.yml
@@ -0,0 +1,22 @@
+product_tag: 'lookalike'
+pipeline_tag: 'unittest'
+persona_table_name: 'lookalike_unittest_keywords_input_persona'
+showlog_table_name: 'lookalike_unittest_keywords_input_showlog'
+clicklog_table_name: 'lookalike_unittest_keywords_input_clicklog'
+keywords_table: 'lookalike_unittest_keywords_input_keywords'
+log:
+  level: 'ERROR' # log level for spark and app
+pipeline:
+  main_keywords:
+    keyword_output_table: 'lookalike_unittest_keywords_output_keywords'
+    keyword_threshold: 0.1  # Portion of showlog traffic that a keyword must 
reach to be included.
+  main_clean:  
+    did_bucket_num: 2 # Number of partitions for did
+    load_logs_in_minutes: 1440 #1440/day, original=14400
+    create_keywords: False # set True for first run, then keep False to use 
the created table.
+    conditions: {
+      'starting_date': '2020-01-01',
+      'ending_date': '2020-01-11'
+    }
+  cutting_date: 1584748800
+  length: 10
diff --git a/Model/lookalike-model/tests/pipeline/data_generator.py 
b/Model/lookalike-model/tests/pipeline/data_generator.py
index 45711ce..59fc0ce 100644
--- a/Model/lookalike-model/tests/pipeline/data_generator.py
+++ b/Model/lookalike-model/tests/pipeline/data_generator.py
@@ -68,6 +68,20 @@ def create_unified_log_table (spark, table_name):
     df = create_unified_log(spark)
     write_to_table(df, table_name)
 
+# Creates raw clicklog data and writes it to Hive.
+def create_keywords_showlog_table (spark, table_name):
+    df = create_keywords_raw_log(spark)
+    df = df.withColumnRenamed('media', 'adv_type')
+    df = df.withColumnRenamed('price_model', 'adv_bill_mode_cd')
+    df = df.withColumnRenamed('action_time', 'show_time')
+    df.printSchema()
+    write_to_table(df, table_name)
+
+# Creates the effective keywords data and writes is to Hive.
+def create_effective_keywords_table(spark, table_name):
+    df = create_effective_keywords(spark)
+    write_to_table(df, table_name)
+
 #==========================================
 # Create dataframes for the unit tests
 #==========================================
@@ -164,20 +178,6 @@ def create_raw_log (spark):
 def create_cleaned_log (spark):
     data = [
         ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 12:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 13:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 14:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 15:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 15:59:59.00', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 15:59:59.99', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 16:00:00.00', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 16:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 17:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 18:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 19:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 20:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 21:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 22:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
-        # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 23:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', 
'2020-01-01', '1', ),
         ('C001', '0000002', '1000', 'splash', 'abcdef1', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-02 12:34:56.78', 'Huawei Browser', 1, 0, 'travel', '1', 
'2020-01-02', '1', ),
         ('C002', '0000003', '1001', 'native', 'abcdef2', 'ABC-AL00', '4G', 
'CPD', '2020-01-03 12:34:56.78', 'Huawei Video', 0, 1, 'travel', '1', 
'2020-01-03', '1', ),
         ('C010', '0000004', '1001', 'native', 'abcdef3', 'ABC-AL00', '4G', 
'CPD', '2020-01-04 12:34:56.78', 'Huawei Music', 1, 1, 'game-avg', '2', 
'2020-01-04', '1', ),
@@ -227,7 +227,17 @@ def create_keywords(spark):
         ('reading', 'C021', 3),
         ('reading', 'C022', 3),
         ('reading', 'C023', 3),
-        ('reading', 'C024', 3)
+        ('reading', 'C024', 3),
+        ('shopping', 'C030', 4),
+        ('shopping', 'C031', 4),
+        ('shopping', 'C032', 4),
+        ('shopping', 'C033', 4),
+        ('shopping', 'C034', 4),
+        ('education', 'C040', 5),
+        ('education', 'C041', 5),
+        ('education', 'C042', 5),
+        ('education', 'C043', 5),
+        ('education', 'C044', 5),
     ]
 
     schema = StructType([
@@ -360,6 +370,53 @@ def create_trainready_filter_user_data (spark):
 
     return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
 
+# Returns a dataframe with unclean log data.
+def create_keywords_raw_log (spark):
+    data = [
+        ('0000001', '1000', 'splash', 'abcdef0', 'C000', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-01 12:34:56.78'), # travel
+        ('0000002', '1000', 'splash', 'abcdef1', 'C001', 'DUB-AL00', 'WIFI', 
'CPC', '2020-01-02 12:34:56.78'), # travel
+        ('0000003', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00',   '4G', 
'CPD', '2020-01-03 12:34:56.78'), # travel
+        ('0000005', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00',   '4G', 
'CPD', '2020-01-03 12:34:56.78'), # travel
+        ('0000006', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00',   '4G', 
'CPD', '2020-01-03 12:34:56.78'), # travel
+        ('0000007', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00',   '4G', 
'CPD', '2020-01-03 12:34:56.78'), # travel
+        ('0000008', '1001', 'native', 'abcdef2', 'C003', 'ABC-AL00',   '4G', 
'CPD', '2020-01-03 12:34:56.78'), # travel
+        ('0000009', '1001', 'native', 'abcdef2', 'C003', 'ABC-AL00',   '4G', 
'CPD', '2020-01-03 12:34:56.78'), # travel
+        ('0000010', '1001', 'native', 'abcdef2', 'C003', 'ABC-AL00',   '4G', 
'CPD', '2020-01-03 12:34:56.78'), # travel
+        ('0000011', '1001', 'native', 'abcdef2', 'C004', 'ABC-AL00',   '4G', 
'CPD', '2020-01-03 12:34:56.78'), # travel
+        ('0000012', '1001', 'native', 'abcdef2', 'C004', 'ABC-AL00',   '4G', 
'CPD', '2020-01-03 12:34:56.78'), # travel
+        ('0000013', '1001', 'native', 'abcdef2', 'C004', 'ABC-AL00',   '4G', 
'CPD', '2020-01-03 12:34:56.78'), # travel
+        ('0000014', '1001', 'native', 'abcdef2', 'C010', 'ABC-AL00',   '4G', 
'CPD', '2020-01-03 12:34:56.78'), # game-avg
+        ('0000004', '1001', 'native', 'abcdef3', 'C010', 'ABC-AL00',   '4G', 
'CPD', '2020-01-04 12:34:56.78'), # game-avg
+        ('0000005', '1001', 'native', 'abcdef3', 'C010', 'ABC-AL00',   '4G', 
'CPD', '2020-01-05 12:34:56.78'), # game-avg
+        ('0000007', '1003', 'splash', 'abcdef6', 'C020', 'XYZ-AL00',   '4G', 
'CPT', '2020-01-07 12:34:56.78'), # reading; only one entry for this keyword so 
will be excluded.
+        ('0000008', '1003', 'splash', 'abcdef6', 'C030', 'XYZ-AL00',   '4G', 
'CPT', '2020-01-08 12:34:56.78'), # shopping; only one entry for this keyword 
so will be excluded.
+        ('0000009', '1003', 'splash', 'abcdef6', 'C040', 'XYZ-AL00',   '4G', 
'CPT', '2020-01-09 12:34:56.78'), # education; just enough entries to be 
included.
+        ('0000009', '1003', 'splash', 'abcdef6', 'C040', 'XYZ-AL00',   '4G', 
'CPT', '2020-01-09 12:34:56.78'), # education; just enough entries to be 
included.
+        ('0000010', '1003', 'splash', 'abcdef6', 'C050', 'XYZ-AL00',   '4G', 
'CPT', '2020-01-10 12:34:56.78'), # no mapping; only one entry for this keyword 
so will be excluded.
+        ('0000001', '1000', 'native', 'abcde10', 'C020', 'JKL-AL00',   '4G', 
'CPD', '2020-01-11 12:34:56.78'), # reading; outside the date range.
+    ]
+
+    schema = StructType([
+        StructField("did", StringType(), True),
+        StructField("adv_id", StringType(), True),
+        StructField("media", StringType(), True),
+        StructField("slot_id", StringType(), True),
+        StructField("spread_app_id", StringType(), True),
+        StructField("device_name", StringType(), True),
+        StructField("net_type", StringType(), True),
+        StructField("price_model", StringType(), True),
+        StructField("action_time", StringType(), True)
+    ])
+
+    return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+
+# Returns a dataframe with the effective keyword data.
+def create_effective_keywords(spark):
+    data = [('travel',), ('game-avg',), ('education',)]
+    schema = StructType([
+        StructField("keyword", StringType(), True)
+    ])
+    return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
 
 
 # Prints to screen the code to generate the given data frame.
diff --git a/Model/lookalike-model/tests/pipeline/test_main_clean.py 
b/Model/lookalike-model/tests/pipeline/test_main_clean.py
index 3102fa3..c302479 100644
--- a/Model/lookalike-model/tests/pipeline/test_main_clean.py
+++ b/Model/lookalike-model/tests/pipeline/test_main_clean.py
@@ -18,8 +18,8 @@ import unittest
 import yaml
 from pyspark import SparkContext
 from pyspark.sql import SparkSession, HiveContext
-from pyspark.sql.functions import col
-from pyspark.sql.types import IntegerType
+from pyspark.sql.functions import col, udf, collect_set
+from pyspark.sql.types import IntegerType, BooleanType
 from lookalike_model.pipeline import main_clean, util
 from data_generator import *
 
@@ -163,10 +163,12 @@ class TestMainClean(unittest.TestCase):
         keywords_table = cfg['keywords_table']
         showlog_table = cfg['showlog_table_name']
         clicklog_table = cfg['clicklog_table_name']
+        effective_keywords_table = 
cfg['pipeline']['main_keywords']['keyword_output_table']
         create_persona_table(self.spark, persona_table)
         create_keywords_table(self.spark, keywords_table)
         create_clicklog_table(self.spark, clicklog_table)
         create_showlog_table(self.spark, showlog_table)
+        create_effective_keywords_table(self.spark, effective_keywords_table)
 
         # Drop the output tables
         showlog_output_table = 
cfg['pipeline']['main_clean']['showlog_output_table']
@@ -184,22 +186,32 @@ class TestMainClean(unittest.TestCase):
         bucket_num = cfg['pipeline']['main_clean']['did_bucket_num']
         df_keywords = util.load_df(self.hive_context, keywords_table)
 
+        # run() does filtering on the effective keywords so we need to filter 
+        # the raw logs with the spread app ids when validating the output.
+        effective_spread_app_ids = ['C000', 'C001', 'C002', 'C003', 'C004', 
'C010', 'C011', 'C012', 'C013', 'C014', ]
+        df_log = create_raw_log(self.spark)
+        df_log = self.filter_spread_app_ids(df_log, effective_spread_app_ids)
+
         # Validate the cleaned persona table.
         df_persona = util.load_df(self.hive_context, persona_output_table)
         self.validate_clean_persona(df_persona, bucket_num)
 
         # Validate the cleaned clicklog table.
         df_clicklog = util.load_df(self.hive_context, clicklog_output_table)
-        print_df_generator_code(df_clicklog.sort('did'))
-        df_log = create_raw_log(self.spark)
         self.validate_cleaned_log(df_clicklog, conditions, df_persona, 
df_keywords, df_log, bucket_num)
+        print_df_generator_code(df_clicklog.sort('did'))
 
         # Validate the cleaned showlog table.
         df_showlog = util.load_df(self.hive_context, clicklog_output_table)
-        print_df_generator_code(df_showlog.sort('did'))
-        df_log = create_raw_log(self.spark)
         self.validate_cleaned_log(df_showlog, conditions, df_persona, 
df_keywords, df_log, bucket_num)
+        print_df_generator_code(df_showlog.sort('did'))
+
+    def filter_spread_app_ids(self, df, spread_app_ids):
+        # User defined function to return if the keyword is in the inclusion 
set.
+        _udf = udf(lambda x: x in spread_app_ids, BooleanType())
 
+        # Return the filtered dataframe.
+        return df.filter(_udf(col('spread_app_id')))
 
     #========================================
     # Helper methods
diff --git a/Model/lookalike-model/tests/pipeline/test_main_keywords.py 
b/Model/lookalike-model/tests/pipeline/test_main_keywords.py
new file mode 100644
index 0000000..6a50389
--- /dev/null
+++ b/Model/lookalike-model/tests/pipeline/test_main_keywords.py
@@ -0,0 +1,88 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+ 
+#  http://www.apache.org/licenses/LICENSE-2.0.html
+
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import unittest
+import yaml
+from pyspark import SparkContext
+from pyspark.sql import SparkSession, HiveContext
+from lookalike_model.pipeline import main_keywords, util
+from data_generator import *
+
+class TestMainKeywords(unittest.TestCase):
+
+    def setUp (self):
+        # Set the log level.
+        sc = SparkContext.getOrCreate()
+        sc.setLogLevel('ERROR')
+
+        # Initialize the Spark session
+        self.spark = SparkSession.builder.appName('unit 
test').enableHiveSupport().getOrCreate()
+        self.hive_context = HiveContext(sc)
+
+    def test_run(self):
+        print('*** Running test_run ***')
+        with open('pipeline/config_keywords.yml', 'r') as ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+
+        util.resolve_placeholder(cfg)
+
+        # Create the input tables.
+        cfg_clean = cfg['pipeline']['main_clean']
+        showlog_table = cfg['showlog_table_name']
+        keywords_mapping_table = cfg['keywords_table']
+        create_keywords_mapping = cfg_clean['create_keywords']
+        create_keywords_table(self.spark, keywords_mapping_table)
+        create_keywords_showlog_table(self.spark, showlog_table)
+
+        # Drop the output tables.
+        cfg_keywords = cfg['pipeline']['main_keywords']
+        keyword_threshold = cfg_keywords['keyword_threshold']
+        effective_keywords_table = cfg_keywords['keyword_output_table']
+        util.drop_table(self.hive_context, effective_keywords_table)
+
+        start_date, end_date, load_minutes = util.load_batch_config(cfg)
+
+        main_keywords.run(self.hive_context, showlog_table, 
keywords_mapping_table, create_keywords_mapping, 
+            start_date, end_date, load_minutes, keyword_threshold, 
effective_keywords_table)
+
+        df_keywords = util.load_df(self.hive_context, effective_keywords_table)
+
+        self.validate_effective_keywords(df_keywords)
+
+
+    def validate_effective_keywords(self, df):
+        # Check the column names.
+        self.assertTrue('keyword' in df.columns)
+
+        # Expected keywords in the dataframe.
+        keywords_match = ['education', 'travel', 'game-avg']
+
+        # Check number of rows.
+        self.assertEqual(df.count(), len(keywords_match))
+
+        df.show()
+
+        # Check the values of the rows.
+        # keywords = df.agg(collect_list('keyword')).collect()
+        # for value in keywords_match:
+        #     self.assertTrue(value in keywords)
+        for row in df.collect():
+            self.assertIn(row['keyword'], keywords_match)
+
+# Runs the tests.
+if __name__ == '__main__':
+    # Run the unit tests.
+    unittest.main()
diff --git a/Model/lookalike-model/tests/run_test.sh 
b/Model/lookalike-model/tests/run_test.sh
index e4fdff9..e360cdf 100644
--- a/Model/lookalike-model/tests/run_test.sh
+++ b/Model/lookalike-model/tests/run_test.sh
@@ -3,6 +3,12 @@
 DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 cd $DIR
 
+# test_main_logs: identifies the keywords that have traffic greater than a set 
percentage.
+if false
+then
+    spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf 
spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict 
pipeline/test_main_keywords.py
+fi
+
 # test_main_clean: preparing cleaned persona, click and show logs data.
 if false
 then
@@ -15,8 +21,21 @@ then
     spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf 
spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict 
pipeline/test_main_logs.py
 fi
 
-# test_main_logs: merges click and show log data.
-if true
+# test_main_trainready: aggregates the click and show log data by user.
+if false
 then
     spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf 
spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict 
pipeline/test_main_trainready.py
 fi
+
+# test_score_matrix_table: Converts the score vector table into matrices.
+if true
+then
+    spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf 
spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict 
application/pipeline/test_score_matrix_table.py
+fi
+
+# test_top_n_similarity_table_generator: Finds the top n users similar to the 
given user.
+if false
+then
+    spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf 
spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict 
application/pipeline/test_top_n_similarity_table_generator_1.py
+    spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf 
spark.hadoop.hive.exec.dynamic.partition=true --conf 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict 
application/pipeline/test_top_n_similarity_table_generator_2.py
+fi

Reply via email to