This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git
The following commit(s) were added to refs/heads/master by this push:
new 8cbc85a [SYSTEMDS-253] Distributed slice finding (task/data parallel,
fixes)
8cbc85a is described below
commit 8cbc85a949b3699cde8ed3cf3e3abec6a27fbc60
Author: gilgenbergg <[email protected]>
AuthorDate: Sun May 3 17:17:57 2020 +0200
[SYSTEMDS-253] Distributed slice finding (task/data parallel, fixes)
Closes #881.
---
docs/Tasks.txt | 3 +-
scripts/staging/hmm/HMM.py | 2 -
scripts/staging/slicing/__init__.py | 0
scripts/staging/slicing/base/Bucket.py | 168 +++++++++++++++++++++
.../staging/slicing/base/{node.py => SparkNode.py} | 79 ++++++----
scripts/staging/slicing/base/__init__.py | 0
scripts/staging/slicing/base/node.py | 28 +++-
scripts/staging/slicing/base/slicer.py | 135 +++++++++--------
.../base/tests/classification/test_adult.py | 101 -------------
.../slicing/base/tests/classification/test_iris.py | 88 -----------
.../base/tests/regression/test_insurance.py | 81 ----------
.../slicing/base/tests/regression/test_salary.py | 87 -----------
scripts/staging/slicing/base/top_k.py | 7 +-
scripts/staging/slicing/base/union_slicer.py | 78 ++++------
.../slicing/spark_modules/join_data_parallel.py | 120 +++++++++++++++
.../staging/slicing/spark_modules/spark_slicer.py | 100 ++++++++++++
.../slicing/spark_modules/spark_union_slicer.py | 70 +++++++++
.../staging/slicing/spark_modules/spark_utils.py | 141 +++++++++++++++++
.../slicing/spark_modules/union_data_parallel.py | 119 +++++++++++++++
scripts/staging/slicing/tests/__init__.py | 0
.../slicing/tests/classification/__init__.py | 0
.../slicing/tests/classification/sparked_adults.py | 118 +++++++++++++++
.../slicing/tests/classification/test_adult.py | 121 +++++++++++++++
.../slicing/tests/classification/test_iris.py | 109 +++++++++++++
.../staging/slicing/tests/regression/__init__.py | 0
.../slicing/tests/regression/bd_spark_salary.py | 131 ++++++++++++++++
.../slicing/tests/regression/spark_salary.py | 123 +++++++++++++++
.../slicing/tests/regression/test_insurance.py | 103 +++++++++++++
.../slicing/tests/regression/test_salary.py | 104 +++++++++++++
29 files changed, 1717 insertions(+), 499 deletions(-)
diff --git a/docs/Tasks.txt b/docs/Tasks.txt
index 9d2dba4..9fa9a6f 100644
--- a/docs/Tasks.txt
+++ b/docs/Tasks.txt
@@ -203,7 +203,8 @@ SYSTEMDS-240 GPU Backend Improvements
SYSTEMDS-250 Extended Slice Finding
* 251 Alternative slice enumeration approach OK
- * 252 Initial data slicing implementation Python
+ * 252 Initial data slicing implementation Python OK
+ * 253 Distributed slicing algorithms (task/data parallel) OK
SYSTEMDS-260 Misc Tools
* 261 Stable marriage algorithm OK
diff --git a/scripts/staging/hmm/HMM.py b/scripts/staging/hmm/HMM.py
index 61fa0d0..d9eb187 100644
--- a/scripts/staging/hmm/HMM.py
+++ b/scripts/staging/hmm/HMM.py
@@ -19,8 +19,6 @@
#
#-------------------------------------------------------------
-#Author: Afan Secic
-
from bs4 import BeautifulSoup,SoupStrainer
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
diff --git a/scripts/staging/slicing/__init__.py
b/scripts/staging/slicing/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/scripts/staging/slicing/base/Bucket.py
b/scripts/staging/slicing/base/Bucket.py
new file mode 100644
index 0000000..0277f6d
--- /dev/null
+++ b/scripts/staging/slicing/base/Bucket.py
@@ -0,0 +1,168 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+class Bucket:
+
+ key: []
+ attributes: []
+ name: ""
+ error: float
+ e_upper: float
+ size: float
+ sum_error: float
+ max_tuple_error: float
+ s_upper: float
+ s_lower: float
+ e_max: float
+ e_max_upper: float
+ score: float
+ c_upper: float
+ parents: []
+ x_size: int
+ loss: float
+ w: float
+
+ def __init__(self, node, cur_lvl, w, x_size, loss):
+ self.attributes = []
+ self.parents = []
+ self.sum_error = 0
+ self.size = 0
+ self.score = 0
+ self.error = 0
+ self.max_tuple_error = 0
+ self.x_size = x_size
+ self.w = w
+ self.loss = loss
+ if cur_lvl == 0:
+ self.key = node
+ self.attributes.append(node)
+ else:
+ self.attributes = node.attributes
+ self.key = node.attributes
+ self.name = self.make_name()
+ self.__hash__()
+
+ def __hash__(self):
+ return hash(self.name)
+
+ def __add__(self, other):
+ self.size += other.size
+ self.sum_error += other.sum_error
+ return self
+
+ def combine_with(self, other):
+ self.size = max(self.size, other.size)
+ self.sum_error = max(self.sum_error, other.sum_error)
+ return self
+
+ def minimize_bounds(self, other):
+ minimized = min(self.s_upper, other.s_upper)
+ self.s_upper = minimized
+ minimized = min(other.s_lower, self.s_lower)
+ self.s_lower = minimized
+ minimized = min(other.e_upper, self.e_upper)
+ self.e_upper = minimized
+ minimized = min(other.e_max_upper, self.e_max_upper)
+ self.e_max_upper = minimized
+ c_upper = self.calc_c_upper(self.w, self.x_size, self.loss)
+ minimized = min(c_upper, self.c_upper)
+ self.c_upper = minimized
+
+ def __eq__(self, other):
+ return (
+ self.__hash__ == other.__hash__ and self.key == other.key)
+
+ def update_metrics(self, row, loss_type):
+ self.size += 1
+ if loss_type == 0:
+ self.sum_error += row[2]
+ if row[2] > self.max_tuple_error:
+ self.max_tuple_error = row[2]
+ else:
+ if row[2] != 0:
+ self.sum_error += 1
+
+ def calc_error(self):
+ if self.size != 0:
+ self.error = self.sum_error / self.size
+ else:
+ self.error = 0
+ self.e_max = self.max_tuple_error
+ self.e_max_upper = self.e_max
+ self.e_upper = self.error
+
+ def check_constraint(self, top_k, x_size, alpha):
+ return self.score >= top_k.min_score and self.size >= x_size / alpha
+
+ def make_name(self):
+ name = ""
+ for attribute in self.attributes:
+ name = name + str(attribute) + " && "
+ name = name[0: len(name) - 4]
+ return name
+
+ def calc_bounds(self, w, x_size, loss):
+ self.s_upper = self.calc_s_upper()
+ self.s_lower = self.calc_s_lower(x_size)
+ self.e_upper = self.calc_e_upper()
+ self.e_max_upper = self.calc_e_max_upper()
+ self.c_upper = self.calc_c_upper(w, x_size, loss)
+
+ def check_bounds(self, x_size, alpha, top_k):
+ return self.s_upper >= x_size / alpha and self.c_upper >=
top_k.min_score
+
+ def calc_s_upper(self):
+ cur_min = self.parents[0].size
+ for parent in self.parents:
+ cur_min = min(cur_min, parent.s_upper)
+ return cur_min
+
+ def calc_e_max_upper(self):
+ e_max_min = self.parents[0].e_max_upper
+ for parent in self.parents:
+ e_max_min = min(e_max_min, parent.e_max_upper)
+ return e_max_min
+
+ def calc_s_lower(self, x_size):
+ size_value = x_size
+ for parent in self.parents:
+ size_value = size_value - (size_value - parent.s_lower)
+ return max(size_value, 1)
+
+ def calc_e_upper(self):
+ prev_e_uppers = []
+ for parent in self.parents:
+ prev_e_uppers.append(parent.e_upper)
+ return float(min(prev_e_uppers))
+
+ def calc_c_upper(self, w, x_size, loss):
+ upper_score = w * (self.e_upper / self.s_lower) / (loss / x_size) + (
+ 1 - w) * self.s_upper
+ return float(upper_score)
+
+ def print_debug(self, topk):
+ print("new node has been created: " + self.make_name() + "\n")
+ print("s_upper = " + str(self.s_upper))
+ print("s_lower = " + str(self.s_lower))
+ print("e_upper = " + str(self.e_upper))
+ print("c_upper = " + str(self.c_upper))
+ print("current topk min score = " + str(topk.min_score))
+
print("-------------------------------------------------------------------------------------")
diff --git a/scripts/staging/slicing/base/node.py
b/scripts/staging/slicing/base/SparkNode.py
similarity index 70%
copy from scripts/staging/slicing/base/node.py
copy to scripts/staging/slicing/base/SparkNode.py
index 6acd65a..fbaa0bd 100644
--- a/scripts/staging/slicing/base/node.py
+++ b/scripts/staging/slicing/base/SparkNode.py
@@ -19,7 +19,7 @@
#
#-------------------------------------------------------------
-class Node:
+class SparkNode:
error: float
name: ""
attributes: []
@@ -36,34 +36,33 @@ class Node:
e_max_upper: float
key: ""
- def __init__(self, all_features, model, complete_x, loss, x_size, y_test,
preds):
- self.error = loss,
+ def __init__(self, loss, preds):
+ if loss:
+ self.error = loss,
+ if preds:
+ self.preds = preds
self.parents = []
self.attributes = []
self.size = 0
self.score = 0
- self.model = model
- self.complete_x = complete_x
self.loss = 0
- self.x_size = x_size
- self.preds = preds
self.s_lower = 1
- self.y_test = y_test
- self.all_features = all_features
self.key = ''
def calc_c_upper(self, w):
- upper_score = w * (self.e_upper / self.s_lower) /
(float(self.error[0]) / self.x_size) + (1 - w) * self.s_upper
+ upper_score = w * (self.e_upper / self.s_lower) / (self.error[0] /
self.preds[0][0].size) + (1 - w) * self.s_upper
return float(upper_score)
def make_slice_mask(self):
mask = []
for feature in self.attributes:
- mask.append(feature[1])
+ mask.append(feature)
return mask
def process_slice(self, loss_type):
mask = self.make_slice_mask()
+ print("mask")
+ print(mask)
if loss_type == 0:
self.calc_l2(mask)
if loss_type == 1:
@@ -73,15 +72,15 @@ class Node:
self.e_max = 1
size = 0
mistakes = 0
- for row in self.complete_x:
+ for row in self.preds:
flag = True
for attr in mask:
- if row[1][attr] == 0:
+ if attr not in row[0].indices:
flag = False
if flag:
size = size + 1
- if self.y_test[row[0]][1] != self.preds[row[0]][1]:
- mistakes = mistakes + 1
+ if row[1] == 0:
+ mistakes += 1
self.size = size
if size != 0:
self.loss = mistakes / size
@@ -93,23 +92,25 @@ class Node:
max_tuple_error = 0
sum_error = 0
size = 0
- for row in self.complete_x:
+ for row in self.preds:
flag = True
for attr in mask:
- if row[1][attr] == 0:
+ if attr not in row[0].indices:
flag = False
if flag:
size = size + 1
- if float(self.preds[row[0]][1]) > max_tuple_error:
- max_tuple_error = float(self.preds[row[0]][1])
- sum_error = sum_error + float(self.preds[row[0]][1])
+ if row[1] > max_tuple_error:
+ max_tuple_error = row[1]
+ sum_error = sum_error + row[1]
self.e_max = max_tuple_error
self.e_upper = max_tuple_error
+ self.e_max_upper = max_tuple_error
if size != 0:
self.loss = sum_error/size
else:
self.loss = 0
self.size = size
+ self.s_upper = size
def calc_s_upper(self, cur_lvl):
cur_min = self.parents[0].size
@@ -132,10 +133,10 @@ class Node:
e_max_min = min(e_max_min, parent.e_max_upper)
return e_max_min
- def calc_s_lower(self, cur_lvl):
- size_value = self.x_size
+ def calc_s_lower(self):
+ size_value = len(self.preds)
for parent in self.parents:
- size_value = size_value - (self.x_size - parent.s_lower)
+ size_value = size_value - (size_value - parent.s_lower)
return max(size_value, 1)
def calc_e_upper(self):
@@ -146,7 +147,7 @@ class Node:
def calc_bounds(self, cur_lvl, w):
self.s_upper = self.calc_s_upper(cur_lvl)
- self.s_lower = self.calc_s_lower(cur_lvl)
+ self.s_lower = self.calc_s_lower()
self.e_upper = self.calc_e_upper()
self.e_max_upper = self.calc_e_max_upper(cur_lvl)
self.c_upper = self.calc_c_upper(w)
@@ -154,12 +155,12 @@ class Node:
def make_name(self):
name = ""
for attribute in self.attributes:
- name = name + str(attribute[0]) + " && "
+ name = name + str(attribute) + " && "
name = name[0: len(name) - 4]
return name
- def make_key(self, new_id):
- return new_id, self.name
+ def make_key(self):
+ return self.name
def check_constraint(self, top_k, x_size, alpha):
return self.score >= top_k.min_score and self.size >= x_size / alpha
@@ -167,6 +168,30 @@ class Node:
def check_bounds(self, top_k, x_size, alpha):
return self.s_upper >= x_size / alpha and self.c_upper >=
top_k.min_score
+ def update_bounds(self, s_upper, s_lower, e_upper, e_max_upper, w):
+ try:
+ minimized = min(s_upper, self.s_upper)
+ self.s_upper = minimized
+ minimized = min(s_lower, self.s_lower)
+ self.s_lower = minimized
+ minimized = min(e_upper, self.e_upper)
+ self.e_upper = minimized
+ minimized = min(e_max_upper, self.e_max_upper)
+ self.e_max_upper = minimized
+ c_upper = self.calc_c_upper(w)
+ minimized = min(c_upper, self.c_upper)
+ self.c_upper = minimized
+ except AttributeError:
+ # initial bounds calculation
+ self.s_upper = s_upper
+ self.s_lower = s_lower
+ self.e_upper = e_upper
+ self.e_max_upper = e_max_upper
+ c_upper = self.calc_c_upper(w)
+ self.c_upper = c_upper
+ minimized = min(c_upper, self.c_upper)
+ self.c_upper = minimized
+
def print_debug(self, topk, level):
print("new node has been created: " + self.make_name() + "\n")
if level >= 1:
diff --git a/scripts/staging/slicing/base/__init__.py
b/scripts/staging/slicing/base/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/scripts/staging/slicing/base/node.py
b/scripts/staging/slicing/base/node.py
index 6acd65a..33091a5 100644
--- a/scripts/staging/slicing/base/node.py
+++ b/scripts/staging/slicing/base/node.py
@@ -36,20 +36,18 @@ class Node:
e_max_upper: float
key: ""
- def __init__(self, all_features, model, complete_x, loss, x_size, y_test,
preds):
+ def __init__(self, complete_x, loss, x_size, y_test, preds):
self.error = loss,
self.parents = []
self.attributes = []
self.size = 0
self.score = 0
- self.model = model
self.complete_x = complete_x
self.loss = 0
self.x_size = x_size
self.preds = preds
self.s_lower = 1
self.y_test = y_test
- self.all_features = all_features
self.key = ''
def calc_c_upper(self, w):
@@ -167,6 +165,30 @@ class Node:
def check_bounds(self, top_k, x_size, alpha):
return self.s_upper >= x_size / alpha and self.c_upper >=
top_k.min_score
+ def update_bounds(self, s_upper, s_lower, e_upper, e_max_upper, w):
+ try:
+ minimized = min(s_upper, self.s_upper)
+ self.s_upper = minimized
+ minimized = min(s_lower, self.s_lower)
+ self.s_lower = minimized
+ minimized = min(e_upper, self.e_upper)
+ self.e_upper = minimized
+ minimized = min(e_max_upper, self.e_max_upper)
+ self.e_max_upper = minimized
+ c_upper = self.calc_c_upper(w)
+ minimized = min(c_upper, self.c_upper)
+ self.c_upper = minimized
+ except AttributeError:
+ # initial bounds calculation
+ self.s_upper = s_upper
+ self.s_lower = s_lower
+ self.e_upper = e_upper
+ self.e_max_upper = e_max_upper
+ c_upper = self.calc_c_upper(w)
+ self.c_upper = c_upper
+ minimized = min(c_upper, self.c_upper)
+ self.c_upper = minimized
+
def print_debug(self, topk, level):
print("new node has been created: " + self.make_name() + "\n")
if level >= 1:
diff --git a/scripts/staging/slicing/base/slicer.py
b/scripts/staging/slicing/base/slicer.py
index 4bc3415..be967b6 100644
--- a/scripts/staging/slicing/base/slicer.py
+++ b/scripts/staging/slicing/base/slicer.py
@@ -19,8 +19,8 @@
#
#-------------------------------------------------------------
-from slicing.node import Node
-from slicing.top_k import Topk
+from slicing.base.node import Node
+from slicing.base.top_k import Topk
# optimization function calculation:
@@ -33,7 +33,7 @@ def opt_fun(fi, si, f, x_size, w):
# slice_name_nonsense function defines if combination of nodes on current
level is fine or impossible:
-# there is a dependency between common nodes' attributes number and current
level is such that:
+# there is dependency between common nodes' attributes number and current
level is such that:
# commons == cur_lvl - 1
# valid combination example: node ABC + node BCD (on 4th level) // three
attributes nodes have two common attributes
# invalid combination example: node ABC + CDE (on 4th level) // result node -
ABCDE (absurd for 4th level)
@@ -51,21 +51,13 @@ def union(lst1, lst2):
return final_list
-# alpha is size significance coefficient (required for optimization function)
-# verbose option is for returning debug info while creating slices and
printing it (in console)
-# k is number of top-slices we want to receive as a result (maximum output, if
less all of subsets will be printed)
-# w is a weight of error function significance (1 - w) is a size significance
propagated into optimization function
-# loss_type = 0 (in case of regression model)
-# loss_type = 1 (in case of classification model)
-def process(all_features, model, complete_x, loss, x_size, y_test, errors,
debug, alpha, k, w, loss_type):
- counter = 0
- # First level slices are enumerated in a "classic way" (getting data and
not analyzing bounds
+def make_first_level(all_features, complete_x, loss, x_size, y_test, errors,
loss_type, top_k, alpha, w):
first_level = []
- levels = []
+ counter = 0
all_nodes = {}
- top_k = Topk(k)
+ # First level slices are enumerated in a "classic way" (getting data and
not analyzing bounds
for feature in all_features:
- new_node = Node(all_features, model, complete_x, loss, x_size, y_test,
errors)
+ new_node = Node(complete_x, loss, x_size, y_test, errors)
new_node.parents = [(feature, counter)]
new_node.attributes.append((feature, counter))
new_node.name = new_node.make_name()
@@ -82,11 +74,69 @@ def process(all_features, model, complete_x, loss, x_size,
y_test, errors, debug
# this method updates top k slices if needed
top_k.add_new_top_slice(new_node)
counter = counter + 1
- levels.append(first_level)
+ return first_level, all_nodes
+
+def join_enum(node_i, prev_lvl, complete_x, loss, x_size, y_test, errors,
debug, alpha, w, loss_type, b_update, cur_lvl,
+ all_nodes, top_k, cur_lvl_nodes):
+ for node_j in range(len(prev_lvl)):
+ flag = slice_name_nonsense(prev_lvl[node_i], prev_lvl[node_j], cur_lvl)
+ if not flag and prev_lvl[node_j].key[0] > prev_lvl[node_i].key[0]:
+ new_node = Node(complete_x, loss, x_size, y_test, errors)
+ parents_set = set(new_node.parents)
+ parents_set.add(prev_lvl[node_i])
+ parents_set.add(prev_lvl[node_j])
+ new_node.parents = list(parents_set)
+ parent1_attr = prev_lvl[node_i].attributes
+ parent2_attr = prev_lvl[node_j].attributes
+ new_node_attr = union(parent1_attr, parent2_attr)
+ new_node.attributes = new_node_attr
+ new_node.name = new_node.make_name()
+ new_id = len(all_nodes)
+ new_node.key = new_node.make_key(new_id)
+ if new_node.key[1] in all_nodes:
+ existing_item = all_nodes[new_node.key[1]]
+ parents_set = set(existing_item.parents)
+ existing_item.parents = parents_set
+ if b_update:
+ s_upper = new_node.calc_s_upper(cur_lvl)
+ s_lower = new_node.calc_s_lower(cur_lvl)
+ e_upper = new_node.calc_e_upper()
+ e_max_upper = new_node.calc_e_max_upper(cur_lvl)
+ new_node.update_bounds(s_upper, s_lower, e_upper,
e_max_upper, w)
+ else:
+ new_node.calc_bounds(cur_lvl, w)
+ all_nodes[new_node.key[1]] = new_node
+ # check if concrete data should be extracted or not (only for
those that have score upper
+ # big enough and if size of subset is big enough
+ to_slice = new_node.check_bounds(top_k, x_size, alpha)
+ if to_slice:
+ new_node.process_slice(loss_type)
+ new_node.score = opt_fun(new_node.loss, new_node.size,
loss, x_size, w)
+ # we decide to add node to current level nodes (in order
to make new combinations
+ # on the next one or not basing on its score value
+ if new_node.check_constraint(top_k, x_size, alpha) and
new_node.key not in top_k.keys:
+ top_k.add_new_top_slice(new_node)
+ cur_lvl_nodes.append(new_node)
+ if debug:
+ new_node.print_debug(top_k, cur_lvl)
+ return cur_lvl_nodes, all_nodes
+
+
+# alpha is size significance coefficient (required for optimization function)
+# verbose option is for returning debug info while creating slices and
printing it (in console)
+# k is number of top-slices we want to receive as a result (maximum output, if
less all of subsets will be printed)
+# w is a weight of error function significance (1 - w) is a size significance
propagated into optimization function
+# loss_type = 0 (in case of regression model)
+# loss_type = 1 (in case of classification model)
+def process(all_features, complete_x, loss, x_size, y_test, errors, debug,
alpha, k, w, loss_type, b_update):
+ levels = []
+ top_k = Topk(k)
+ first_level = make_first_level(all_features, complete_x, loss, x_size,
y_test, errors, loss_type, top_k, alpha, w)
+ all_nodes = first_level[1]
+ levels.append(first_level[0])
# cur_lvl - index of current level, correlates with number of slice
forming features
cur_lvl = 1 # currently filled level after first init iteration
-
# currently for debug
print("Level 1 had " + str(len(all_features)) + " candidates")
print()
@@ -95,52 +145,17 @@ def process(all_features, model, complete_x, loss, x_size,
y_test, errors, debug
# combining each candidate of previous level with every till it becomes
useless (one node can't make a pair)
while len(levels[cur_lvl - 1]) > 1:
cur_lvl_nodes = []
- for node_i in range(len(levels[cur_lvl - 1]) - 1):
- for node_j in range(node_i + 1, len(levels[cur_lvl - 1])):
- flag = slice_name_nonsense(levels[cur_lvl - 1][node_i],
levels[cur_lvl - 1][node_j], cur_lvl)
- if not flag:
- new_node = Node(all_features, model, complete_x, loss,
x_size, y_test, errors)
- parents_set = set(new_node.parents)
- parents_set.add(levels[cur_lvl - 1][node_i])
- parents_set.add(levels[cur_lvl - 1][node_j])
- new_node.parents = list(parents_set)
- parent1_attr = levels[cur_lvl - 1][node_i].attributes
- parent2_attr = levels[cur_lvl - 1][node_j].attributes
- new_node_attr = union(parent1_attr, parent2_attr)
- new_node.attributes = new_node_attr
- new_node.name = new_node.make_name()
- new_id = len(all_nodes)
- new_node.key = new_node.make_key(new_id)
- if new_node.key[1] in all_nodes:
- existing_item = all_nodes[new_node.key[1]]
- parents_set = set(existing_item.parents)
- existing_item.parents = parents_set
- else:
- new_node.calc_bounds(cur_lvl, w)
- all_nodes[new_node.key[1]] = new_node
- # check if concrete data should be extracted or not
(only for those that have score upper
- # big enough and if size of subset is big enough
- to_slice = new_node.check_bounds(top_k, x_size, alpha)
- if to_slice:
- new_node.process_slice(loss_type)
- new_node.score = opt_fun(new_node.loss,
new_node.size, loss, x_size, w)
- # we decide to add node to current level nodes (in
order to make new combinations
- # on the next one or not basing on its score value
- if new_node.check_constraint(top_k, x_size, alpha)
and new_node.key not in top_k.keys:
- cur_lvl_nodes.append(new_node)
- top_k.add_new_top_slice(new_node)
- elif new_node.check_bounds(top_k, x_size, alpha):
- cur_lvl_nodes.append(new_node)
- else:
- if new_node.check_bounds(top_k, x_size, alpha):
- cur_lvl_nodes.append(new_node)
- if debug:
- new_node.print_debug(top_k, cur_lvl)
- print("Level " + str(cur_lvl + 1) + " had " + str(len(levels[cur_lvl -
1]) * (len(levels[cur_lvl - 1]) - 1)) +
- " candidates but after pruning only " + str(len(cur_lvl_nodes))
+ " go to the next level")
+ prev_lvl = levels[cur_lvl - 1]
+ for node_i in range(len(prev_lvl)):
+ partial = join_enum(node_i, prev_lvl, complete_x, loss, x_size,
y_test, errors, debug, alpha, w, loss_type,
+ b_update, cur_lvl, all_nodes, top_k,
cur_lvl_nodes)
+ cur_lvl_nodes = partial[0]
+ all_nodes = partial[1]
cur_lvl = cur_lvl + 1
levels.append(cur_lvl_nodes)
top_k.print_topk()
+ print("Level " + str(cur_lvl) + " had " + str(len(prev_lvl) *
(len(prev_lvl) - 1)) +
+ " candidates but after pruning only " + str(len(cur_lvl_nodes))
+ " go to the next level")
print("Program stopped at level " + str(cur_lvl + 1))
print()
print("Selected slices are: ")
diff --git a/scripts/staging/slicing/base/tests/classification/test_adult.py
b/scripts/staging/slicing/base/tests/classification/test_adult.py
deleted file mode 100644
index 247d5c4..0000000
--- a/scripts/staging/slicing/base/tests/classification/test_adult.py
+++ /dev/null
@@ -1,101 +0,0 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-import pandas as pd
-from sklearn.preprocessing import OneHotEncoder
-
-import slicing.slicer as slicer
-from sklearn.ensemble import RandomForestClassifier
-from sklearn import preprocessing
-from sklearn.model_selection import train_test_split
-
-
-dataset = pd.read_csv('adult.csv')
-attributes_amount = len(dataset.values[0])
-x = dataset.iloc[:, 0:attributes_amount - 1].values
-# enc = OneHotEncoder(handle_unknown='ignore')
-# x = enc.fit_transform(x).toarray()
-y = dataset.iloc[:, attributes_amount - 1]
-le = preprocessing.LabelEncoder()
-le.fit(y)
-y = le.transform(y)
-complete_x = []
-complete_y = []
-counter = 0
-all_indexes = []
-not_encoded_columns = [
- "Age", "WorkClass", "fnlwgt", "Education", "EducationNum",
- "MaritalStatus", "Occupation", "Relationship", "Race", "Gender",
- "CapitalGain", "CapitalLoss", "HoursPerWeek", "NativeCountry", "Income"
-]
-for row in x:
- row[0] = int(row[0] / 10)
- row[2] = int(row[2]) // 100000
- row[4] = int(row[4] / 5)
- row[10] = int(row[10] / 1000)
- row[12] = int(row[12] / 10)
-enc = OneHotEncoder(handle_unknown='ignore')
-x = enc.fit_transform(x).toarray()
-all_features = enc.get_feature_names()
-x_size = len(complete_x)
-x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2,
random_state=0)
-for item in x_test:
- complete_x.append((counter, item))
- complete_y.append((counter, y_test[counter]))
- counter = counter + 1
-x_size = counter
-clf = RandomForestClassifier(n_jobs=2, random_state=0)
-clf.fit(x_train, y_train)
-RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
- max_depth=None, max_features='auto', max_leaf_nodes=None,
- min_impurity_split=1e-07, min_samples_leaf=1,
- min_samples_split=2, min_weight_fraction_leaf=0.0,
- n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
- verbose=0, warm_start=False)
-
-# alpha is size significance coefficient
-# verbose option is for returning debug info while creating slices and
printing it
-# k is number of top-slices we want
-# w is a weight of error function significance (1 - w) is a size significance
propagated into optimization function
-# loss_type = 0 (l2 in case of regression model
-# loss_type = 1 (cross entropy in case of classification model)
-preds = clf.predict(x_test)
-predictions = []
-counter = 0
-mistakes = 0
-for pred in preds:
- predictions.append((counter, pred))
- if y_test[counter] != pred:
- mistakes = mistakes + 1
- counter = counter + 1
-lossF = mistakes / counter
-
-# enumerator <union>/<join> indicates an approach of next level slices
combination process:
-# in case of <join> in order to create new node of current level slicer
-# combines only nodes of previous layer with each other
-# <union> case implementation is based on DPSize algorithm
-enumerator = "union"
-if enumerator == "join":
- slicer.process(all_features, clf, complete_x, lossF, x_size, complete_y,
predictions, debug=True, alpha=4, k=10,
- w=0.5, loss_type=1)
-elif enumerator == "union":
- union_slicer.process(all_features, clf, complete_x, lossF, x_size,
complete_y, predictions, debug=True, alpha=4,
- k=10, w=0.5, loss_type=1)
diff --git a/scripts/staging/slicing/base/tests/classification/test_iris.py
b/scripts/staging/slicing/base/tests/classification/test_iris.py
deleted file mode 100644
index ce8effd..0000000
--- a/scripts/staging/slicing/base/tests/classification/test_iris.py
+++ /dev/null
@@ -1,88 +0,0 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-import pandas as pd
-from sklearn.preprocessing import OneHotEncoder
-
-import slicing.slicer as slicer
-from sklearn.ensemble import RandomForestClassifier
-from sklearn import preprocessing
-from sklearn.model_selection import train_test_split
-
-
-dataset = pd.read_csv('iris.csv')
-attributes_amount = len(dataset.values[0])
-x = dataset.iloc[:, 0:attributes_amount - 1].values
-enc = OneHotEncoder(handle_unknown='ignore')
-x = enc.fit_transform(x).toarray()
-y = dataset.iloc[:, attributes_amount - 1]
-le = preprocessing.LabelEncoder()
-le.fit(y)
-y = le.transform(y)
-complete_x = []
-complete_y = []
-counter = 0
-all_indexes = []
-all_features = enc.get_feature_names()
-x_size = len(complete_x)
-x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2,
random_state=0)
-for item in x_test:
- complete_x.append((counter, item))
- complete_y.append((counter, y_test[counter]))
- counter = counter + 1
-x_size = counter
-clf = RandomForestClassifier(n_jobs=2, random_state=0)
-clf.fit(x_train, y_train)
-RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
- max_depth=None, max_features='auto', max_leaf_nodes=None,
- min_impurity_split=1e-07, min_samples_leaf=1,
- min_samples_split=2, min_weight_fraction_leaf=0.0,
- n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
- verbose=0, warm_start=False)
-
-# alpha is size significance coefficient
-# verbose option is for returning debug info while creating slices and
printing it
-# k is number of top-slices we want
-# w is a weight of error function significance (1 - w) is a size significance
propagated into optimization function
-# loss_type = 0 (l2 in case of regression model
-# loss_type = 1 (cross entropy in case of classification model)
-preds = clf.predict(x_test)
-predictions = []
-counter = 0
-mistakes = 0
-for pred in preds:
- predictions.append((counter, pred))
- if y_test[counter] != pred:
- mistakes = mistakes + 1
- counter = counter + 1
-lossF = mistakes / counter
-
-# enumerator <union>/<join> indicates an approach of next level slices
combination process:
-# in case of <join> in order to create new node of current level slicer
-# combines only nodes of previous layer with each other
-# <union> case implementation is based on DPSize algorithm
-enumerator = "join"
-if enumerator == "join":
- slicer.process(all_features, clf, complete_x, lossF, x_size, complete_y,
predictions, debug=True, alpha=6, k=10,
- w=0.5, loss_type=1)
-elif enumerator == "union":
- union_slicer.process(all_features, clf, complete_x, lossF, x_size,
complete_y, predictions, debug=True, alpha=6, k=10,
- w=0.5, loss_type=1)
diff --git a/scripts/staging/slicing/base/tests/regression/test_insurance.py
b/scripts/staging/slicing/base/tests/regression/test_insurance.py
deleted file mode 100644
index 34a5d37..0000000
--- a/scripts/staging/slicing/base/tests/regression/test_insurance.py
+++ /dev/null
@@ -1,81 +0,0 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-import pandas as pd
-from sklearn.linear_model import LinearRegression
-from sklearn.model_selection import train_test_split
-from sklearn.preprocessing import OneHotEncoder
-
-import slicing.slicer as slicer
-
-file_name = 'insurance.csv'
-dataset = pd.read_csv(file_name)
-attributes_amount = len(dataset.values[0])
-# for now working with regression datasets, assuming that target attribute is
the last one
-# currently non-categorical features are not supported and should be binned
-y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values
-# starting with one not including id field
-x = dataset.iloc[:, 0:attributes_amount - 1].values
-# list of numerical columns
-non_categorical = [1, 3]
-for row in x:
- for attribute in non_categorical:
- # <attribute - 2> as we already excluded from x id column
- row[attribute - 1] = int(row[attribute - 1] / 5)
-# hot encoding of categorical features
-enc = OneHotEncoder(handle_unknown='ignore')
-x = enc.fit_transform(x).toarray()
-complete_x = []
-complete_y = []
-counter = 0
-all_features = enc.get_feature_names()
-# train model on a whole dataset
-x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3,
random_state=0)
-for item in x_test:
- complete_x.append((counter, item))
- complete_y.append((counter, y_test[counter]))
- counter = counter + 1
-x_size = counter
-model = LinearRegression()
-model.fit(x_train, y_train)
-preds = (model.predict(x_test) - y_test) ** 2
-f_l2 = sum(preds)/x_size
-errors = []
-counter = 0
-for pred in preds:
- errors.append((counter, pred))
- counter = counter + 1
-# alpha is size significance coefficient
-# verbose option is for returning debug info while creating slices and
printing it
-# k is number of top-slices we want
-# w is a weight of error function significance (1 - w) is a size significance
propagated into optimization function
-
-# enumerator <union>/<join> indicates an approach of next level slices
combination process:
-# in case of <join> in order to create new node of current level slicer
-# combines only nodes of previous layer with each other
-# <union> case implementation is based on DPSize algorithm
-enumerator = "union"
-if enumerator == "join":
- slicer.process(all_features, model, complete_x, f_l2, x_size, y_test,
errors, debug=True, alpha=5, k=10,
- w=0.5, loss_type=0)
-elif enumerator == "union":
- union_slicer.process(all_features, model, complete_x, f_l2, x_size,
y_test, errors, debug=True, alpha=5, k=10,
- w=0.5, loss_type=0)
diff --git a/scripts/staging/slicing/base/tests/regression/test_salary.py
b/scripts/staging/slicing/base/tests/regression/test_salary.py
deleted file mode 100644
index c51f4b2..0000000
--- a/scripts/staging/slicing/base/tests/regression/test_salary.py
+++ /dev/null
@@ -1,87 +0,0 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-import pandas as pd
-from sklearn.linear_model import LinearRegression
-from sklearn.model_selection import train_test_split
-from sklearn.preprocessing import OneHotEncoder
-
-import slicing.slicer as slicer
-
-file_name = 'salary.csv'
-dataset = pd.read_csv(file_name)
-attributes_amount = len(dataset.values[0])
-# for now working with regression datasets, assuming that target attribute is
the last one
-# currently non-categorical features are not supported and should be binned
-y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values
-# starting with one not including id field
-x = dataset.iloc[:, 1:attributes_amount - 1].values
-# list of numerical columns
-non_categorical = [4, 5]
-for row in x:
- for attribute in non_categorical:
- # <attribute - 2> as we already excluded from x id column
- row[attribute - 2] = int(row[attribute - 2] / 5)
-# hot encoding of categorical features
-enc = OneHotEncoder(handle_unknown='ignore')
-x = enc.fit_transform(x).toarray()
-complete_x = []
-complete_y = []
-counter = 0
-all_features = enc.get_feature_names()
-# train model on a whole dataset
-x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3,
random_state=0)
-for item in x_test:
- complete_x.append((counter, item))
- complete_y.append((counter, y_test[counter]))
- counter = counter + 1
-x_size = counter
-model = LinearRegression()
-model.fit(x_train, y_train)
-preds = (model.predict(x_test) - y_test) ** 2
-f_l2 = sum(preds)/x_size
-errors = []
-counter = 0
-for pred in preds:
- errors.append((counter, pred))
- counter = counter + 1
-# alpha is size significance coefficient
-# verbose option is for returning debug info while creating slices and
printing it
-# k is number of top-slices we want
-# w is a weight of error function significance (1 - w) is a size significance
propagated into optimization function
-<<<<<<< HEAD:scripts/staging/slicing/base/tests/regression/test_salary.py
-slicer.process(all_features, model, complete_x, f_l2, x_size, y_test, errors,
debug=True, alpha=4, k=10, w=0.5,
- loss_type=0)
-
-=======
-
-# enumerator <union>/<join> indicates an approach of next level slices
combination process:
-# in case of <join> in order to create new node of current level slicer
-# combines only nodes of previous layer with each other
-# <union> case implementation is based on DPSize algorithm
-enumerator = "union"
-if enumerator == "join":
- slicer.process(all_features, model, complete_x, f_l2, x_size, y_test,
errors, debug=True, alpha=4, k=10, w=0.5,
- loss_type=0)
-elif enumerator == "union":
- union_slicer.process(all_features, model, complete_x, f_l2, x_size,
y_test, errors, debug=True, alpha=4, k=10, w=0.5,
- loss_type=0)
->>>>>>> [SYSTEMDS-xxx] Alternative slice enumeration
algorithm:scripts/staging/slicing/base/tests/regression/test_salary.py
diff --git a/scripts/staging/slicing/base/top_k.py
b/scripts/staging/slicing/base/top_k.py
index 87a5d56..3957fea 100644
--- a/scripts/staging/slicing/base/top_k.py
+++ b/scripts/staging/slicing/base/top_k.py
@@ -51,5 +51,8 @@ class Topk:
for candidate in self.slices:
print(candidate.name + ": " + "score = " + str(candidate.score) +
"; size = " + str(candidate.size))
- print(candidate.name + ": " + "score = " + str(candidate.score) +
"; size = " + str(candidate.size))
-
+ def buckets_top_k(self, cur_lvl_slices, x_size, alpha):
+ for bucket in cur_lvl_slices:
+ if bucket.check_constraint(self, x_size, alpha):
+ self.add_new_top_slice(bucket)
+ return self
diff --git a/scripts/staging/slicing/base/union_slicer.py
b/scripts/staging/slicing/base/union_slicer.py
index 6ca003f..6b5fb0e 100644
--- a/scripts/staging/slicing/base/union_slicer.py
+++ b/scripts/staging/slicing/base/union_slicer.py
@@ -19,9 +19,9 @@
#
#-------------------------------------------------------------
-from slicing.node import Node
-from slicing.top_k import Topk
-from slicing.slicer import opt_fun, union
+from slicing.base.node import Node
+from slicing.base.top_k import Topk
+from slicing.base.slicer import opt_fun, union
def check_attributes(left_node, right_node):
@@ -35,15 +35,12 @@ def check_attributes(left_node, right_node):
return flag
-def process(all_features, model, complete_x, loss, x_size, y_test, errors,
debug, alpha, k, w, loss_type):
+def make_first_level(all_features, complete_x, loss, x_size, y_test, errors,
loss_type, w, alpha, top_k):
+ all_nodes = {}
counter = 0
- # First level slices are enumerated in a "classic way" (getting data and
not analyzing bounds
first_level = []
- levels = []
- all_nodes = {}
- top_k = Topk(k)
for feature in all_features:
- new_node = Node(all_features, model, complete_x, loss, x_size, y_test,
errors)
+ new_node = Node(complete_x, loss, x_size, y_test, errors)
new_node.parents = [(feature, counter)]
new_node.attributes.append((feature, counter))
new_node.name = new_node.make_name()
@@ -65,12 +62,23 @@ def process(all_features, model, complete_x, loss, x_size,
y_test, errors, debug
# this method updates top k slices if needed
top_k.add_new_top_slice(new_node)
counter = counter + 1
- # double appending of first level nodes in order to enumerating second
level in the same way as others
- levels.append((first_level, len(all_features)))
- levels.append((first_level, len(all_features)))
+ return first_level, all_nodes
+
+def union_enum():
+ return None
+
+
+def process(all_features, complete_x, loss, x_size, y_test, errors, debug,
alpha, k, w, loss_type, b_update):
+ top_k = Topk(k)
+ # First level slices are enumerated in a "classic way" (getting data and
not analyzing bounds
+ levels = []
+ first_level = make_first_level(all_features, complete_x, loss, x_size,
y_test, errors, loss_type, w, alpha, top_k)
+ # double appending of first level nodes in order to enumerating second
level in the same way as others
+ levels.append((first_level[0], len(all_features)))
+ all_nodes = first_level[1]
# cur_lvl - index of current level, correlates with number of slice
forming features
- cur_lvl = 2 # level that is planned to be filled later
+ cur_lvl = 1 # level that is planned to be filled later
cur_lvl_nodes = first_level
# currently for debug
print("Level 1 had " + str(len(all_features)) + " candidates")
@@ -81,13 +89,13 @@ def process(all_features, model, complete_x, loss, x_size,
y_test, errors, debug
while len(cur_lvl_nodes) > 0:
cur_lvl_nodes = []
count = 0
- for left in range(int(cur_lvl / 2)):
+ for left in range(int(cur_lvl / 2) + 1):
right = cur_lvl - 1 - left
for node_i in range(len(levels[left][0])):
for node_j in range(len(levels[right][0])):
flag = check_attributes(levels[left][0][node_i],
levels[right][0][node_j])
if not flag:
- new_node = Node(all_features, model, complete_x, loss,
x_size, y_test, errors)
+ new_node = Node(complete_x, loss, x_size, y_test,
errors)
parents_set = set(new_node.parents)
parents_set.add(levels[left][0][node_i])
parents_set.add(levels[right][0][node_j])
@@ -103,32 +111,12 @@ def process(all_features, model, complete_x, loss,
x_size, y_test, errors, debug
existing_item = all_nodes[new_node.key[1]]
parents_set = set(existing_item.parents)
existing_item.parents = parents_set
- s_upper = new_node.calc_s_upper(cur_lvl)
- s_lower = new_node.calc_s_lower(cur_lvl)
- e_upper = new_node.calc_e_upper()
- e_max_upper = new_node.calc_e_max_upper(cur_lvl)
- try:
- minimized = min(s_upper, new_node.s_upper)
- new_node.s_upper = minimized
- minimized = min(s_lower, new_node.s_lower)
- new_node.s_lower = minimized
- minimized = min(e_upper, new_node.e_upper)
- new_node.e_upper = minimized
- minimized= min(e_max_upper,
new_node.e_max_upper)
- new_node.e_max_upper = minimized
- c_upper = new_node.calc_c_upper(w)
- minimized= min(c_upper, new_node.c_upper)
- new_node.c_upper = minimized
- except AttributeError:
- # initial bounds calculation
- new_node.s_upper = s_upper
- new_node.s_lower = s_lower
- new_node.e_upper = e_upper
- new_node.e_max_upper = e_max_upper
- c_upper = new_node.calc_c_upper(w)
- new_node.c_upper = c_upper
- minimized = min(c_upper, new_node.c_upper)
- new_node.c_upper = minimized
+ if b_update:
+ s_upper = new_node.calc_s_upper(cur_lvl)
+ s_lower = new_node.calc_s_lower(cur_lvl)
+ e_upper = new_node.calc_e_upper()
+ e_max_upper =
new_node.calc_e_max_upper(cur_lvl)
+ new_node.update_bounds(s_upper, s_lower,
e_upper, e_max_upper, w)
else:
new_node.calc_bounds(cur_lvl, w)
all_nodes[new_node.key[1]] = new_node
@@ -140,13 +128,9 @@ def process(all_features, model, complete_x, loss, x_size,
y_test, errors, debug
new_node.score = opt_fun(new_node.loss,
new_node.size, loss, x_size, w)
# we decide to add node to current level nodes
(in order to make new combinations
# on the next one or not basing on its score
value
- if new_node.score >= top_k.min_score and
new_node.size >= x_size / alpha \
- and new_node.key not in top_k.keys:
- cur_lvl_nodes.append(new_node)
+ if new_node.check_constraint(top_k, x_size,
alpha) and new_node.key not in top_k.keys:
top_k.add_new_top_slice(new_node)
- else:
- if new_node.s_upper >= x_size / alpha and
new_node.c_upper >= top_k.min_score:
- cur_lvl_nodes.append(new_node)
+ cur_lvl_nodes.append(new_node)
if debug:
new_node.print_debug(top_k, cur_lvl)
count = count + levels[left][1] * levels[right][1]
diff --git a/scripts/staging/slicing/spark_modules/join_data_parallel.py
b/scripts/staging/slicing/spark_modules/join_data_parallel.py
new file mode 100644
index 0000000..78156c3
--- /dev/null
+++ b/scripts/staging/slicing/spark_modules/join_data_parallel.py
@@ -0,0 +1,120 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from pyspark import SparkContext
+
+from slicing.base.Bucket import Bucket
+from slicing.base.SparkNode import SparkNode
+from slicing.base.top_k import Topk
+from slicing.spark_modules import spark_utils
+from slicing.spark_modules.spark_utils import approved_join_slice
+
+
+def rows_mapper(row, buckets, loss_type):
+ filtered = dict(filter(lambda bucket: all(attr in row[1] for attr in
bucket[1].attributes), buckets.items()))
+ for item in filtered:
+ filtered[item].update_metrics(row, loss_type)
+ return filtered
+
+
+def join_enum(cur_lvl_nodes, cur_lvl, x_size, alpha, top_k, w, loss):
+ buckets = {}
+ for node_i in range(len(cur_lvl_nodes)):
+ for node_j in range(node_i + 1, len(cur_lvl_nodes)):
+ flag = approved_join_slice(cur_lvl_nodes[node_i],
cur_lvl_nodes[node_j], cur_lvl)
+ if flag:
+ node = SparkNode(None, None)
+ node.attributes = list(set(cur_lvl_nodes[node_i].attributes) |
set(cur_lvl_nodes[node_j].attributes))
+ bucket = Bucket(node, cur_lvl, w, x_size, loss)
+ bucket.parents.append(cur_lvl_nodes[node_i])
+ bucket.parents.append(cur_lvl_nodes[node_j])
+ bucket.calc_bounds(w, x_size, loss)
+ if bucket.check_bounds(x_size, alpha, top_k):
+ buckets[bucket.name] = bucket
+ return buckets
+
+
+def combiner(a):
+ return a
+
+
+def merge_values(a, b):
+ a.combine_with(b)
+ return a
+
+
+def merge_combiners(a, b):
+ a + b
+ return a
+
+
+def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w,
loss_type):
+ top_k = Topk(k)
+ cur_lvl = 0
+ cur_lvl_nodes = list(all_features)
+ pred_pandas = predictions.toPandas()
+ x_size = len(pred_pandas)
+ b_topk = SparkContext.broadcast(sc, top_k)
+ b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
+ b_cur_lvl_nodes = SparkContext.broadcast(sc, cur_lvl_nodes)
+ buckets = {}
+ for node in b_cur_lvl_nodes.value:
+ bucket = Bucket(node, b_cur_lvl.value, w, x_size, loss)
+ buckets[bucket.name] = bucket
+ b_buckets = SparkContext.broadcast(sc, buckets)
+ rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2]))\
+ .map(lambda item: (item[0], item[1].tolist(), item[2]))
+ mapped = rows.map(lambda row: rows_mapper(row, b_buckets.value, loss_type))
+ flattened = mapped.flatMap(lambda line: (line.items()))
+ reduced = flattened.combineByKey(combiner, merge_values, merge_combiners)
+ cur_lvl_nodes = reduced.values()\
+ .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket, loss, w,
x_size, b_cur_lvl.value))
+ if debug:
+ cur_lvl_nodes.map(lambda bucket:
bucket.print_debug(b_topk.value)).collect()
+ cur_lvl = 1
+ prev_level = cur_lvl_nodes.collect()
+ top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
+ while len(prev_level) > 0:
+ b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
+ b_topk = SparkContext.broadcast(sc, top_k)
+ b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
+ b_topk.value.print_topk()
+ buckets = join_enum(b_cur_lvl_nodes.value, b_cur_lvl.value, x_size,
alpha, b_topk.value, w, loss)
+ b_buckets = SparkContext.broadcast(sc, buckets)
+ to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size,
alpha, b_topk.value), b_buckets.value.items()))
+ mapped = rows.map(lambda row: rows_mapper(row, to_slice, loss_type))
+ flattened = mapped.flatMap(lambda line: (line.items()))
+ to_process = flattened.combineByKey(combiner, merge_values,
merge_combiners)
+ if debug:
+ to_process.values().map(lambda bucket:
bucket.print_debug(b_topk.value)).collect()
+ prev_level = to_process\
+ .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1],
loss, w, x_size, b_cur_lvl.value))\
+ .collect()
+ cur_lvl = b_cur_lvl.value + 1
+ top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
+ print("Level " + str(b_cur_lvl.value) + " had " + str(
+ len(b_cur_lvl_nodes.value * (len(b_cur_lvl_nodes.value) - 1)))+"
candidates but after pruning only " +
+ str(len(prev_level)) + " go to the next level")
+ print("Program stopped at level " + str(cur_lvl))
+ print()
+ print("Selected slices are: ")
+ b_topk.value.print_topk()
+ return None
diff --git a/scripts/staging/slicing/spark_modules/spark_slicer.py
b/scripts/staging/slicing/spark_modules/spark_slicer.py
new file mode 100644
index 0000000..86f2b34
--- /dev/null
+++ b/scripts/staging/slicing/spark_modules/spark_slicer.py
@@ -0,0 +1,100 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from pyspark import SparkContext
+
+from slicing.base.SparkNode import SparkNode
+from slicing.base.slicer import union, opt_fun
+from slicing.base.top_k import Topk
+from slicing.spark_modules import spark_utils
+from slicing.spark_modules.spark_utils import update_top_k
+
+
+def join_enum_fun(node_a, list_b, predictions, f_l2, debug, alpha, w,
loss_type, cur_lvl, top_k):
+ x_size = len(predictions)
+ nodes = []
+ for node_i in range(len(list_b)):
+ flag = spark_utils.approved_join_slice(node_i, node_a, cur_lvl)
+ if not flag:
+ new_node = SparkNode(predictions, f_l2)
+ parents_set = set(new_node.parents)
+ parents_set.add(node_i)
+ parents_set.add(node_a)
+ new_node.parents = list(parents_set)
+ parent1_attr = node_a.attributes
+ parent2_attr = list_b[node_i].attributes
+ new_node_attr = union(parent1_attr, parent2_attr)
+ new_node.attributes = new_node_attr
+ new_node.name = new_node.make_name()
+ new_node.calc_bounds(cur_lvl, w)
+ # check if concrete data should be extracted or not (only for
those that have score upper
+ # and if size of subset is big enough
+ to_slice = new_node.check_bounds(top_k, x_size, alpha)
+ if to_slice:
+ new_node.process_slice(loss_type)
+ new_node.score = opt_fun(new_node.loss, new_node.size, f_l2,
x_size, w)
+ # we decide to add node to current level nodes (in order to
make new combinations
+ # on the next one or not basing on its score value
+ if new_node.check_constraint(top_k, x_size, alpha) and
new_node.key not in top_k.keys:
+ top_k.add_new_top_slice(new_node)
+ nodes.append(new_node)
+ if debug:
+ new_node.print_debug(top_k, cur_lvl)
+ return nodes
+
+
+def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w,
loss_type, enumerator):
+ top_k = Topk(k)
+ cur_lvl = 0
+ levels = []
+ first_level = {}
+ all_features = list(all_features)
+ first_tasks = sc.parallelize(all_features)
+ b_topk = SparkContext.broadcast(sc, top_k)
+ init_slices = first_tasks.mapPartitions(lambda features:
spark_utils.make_first_level(features, predictions, loss,
+
b_topk.value, w, loss_type)) \
+ .map(lambda node: (node.key, node)).collect()
+ first_level.update(init_slices)
+ update_top_k(first_level, b_topk.value, alpha, predictions)
+ prev_level = SparkContext.broadcast(sc, first_level)
+ levels.append(prev_level)
+ cur_lvl = cur_lvl + 1
+ b_topk.value.print_topk()
+ # checking the first partition of level. if not empty then processing
otherwise no elements were added to this level
+ while len(levels[cur_lvl - 1].value) > 0:
+ nodes_list = {}
+ partitions = sc.parallelize(levels[cur_lvl - 1].value.values())
+ mapped = partitions.mapPartitions(lambda nodes:
spark_utils.nodes_enum(nodes, levels[cur_lvl - 1].value.values(),
+
predictions, loss, b_topk.value, alpha, k, w,
+
loss_type, cur_lvl, debug, enumerator))
+ flattened = mapped.flatMap(lambda node: node)
+ nodes_list.update(flattened.map(lambda node: (node.key,
node)).distinct().collect())
+ prev_level = SparkContext.broadcast(sc, nodes_list)
+ levels.append(prev_level)
+ update_top_k(nodes_list, b_topk.value, alpha, predictions)
+ cur_lvl = cur_lvl + 1
+ b_topk.value.print_topk()
+ print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl -
1].value) * (len(levels[cur_lvl - 1].value) - 1)) +
+ " candidates but after pruning only " + str(len(nodes_list)) + "
go to the next level")
+ print("Program stopped at level " + str(cur_lvl))
+ print()
+ print("Selected slices are: ")
+ b_topk.value.print_topk()
diff --git a/scripts/staging/slicing/spark_modules/spark_union_slicer.py
b/scripts/staging/slicing/spark_modules/spark_union_slicer.py
new file mode 100644
index 0000000..811baff
--- /dev/null
+++ b/scripts/staging/slicing/spark_modules/spark_union_slicer.py
@@ -0,0 +1,70 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+from pyspark import SparkContext
+
+from slicing.base.top_k import Topk
+from slicing.spark_modules import spark_utils
+from slicing.spark_modules.spark_utils import update_top_k
+
+
+def process(all_features, predictions, loss, sc, debug, alpha, k, w,
loss_type, enumerator):
+ top_k = Topk(k)
+ cur_lvl = 0
+ levels = []
+ all_features = list(all_features)
+ first_level = {}
+ first_tasks = sc.parallelize(all_features)
+ b_topk = SparkContext.broadcast(sc, top_k)
+ init_slices = first_tasks.mapPartitions(lambda features:
spark_utils.make_first_level(features, predictions, loss,
+
b_topk.value, w, loss_type)) \
+ .map(lambda node: (node.key, node)) \
+ .collect()
+ first_level.update(init_slices)
+ update_top_k(first_level, b_topk.value, alpha, predictions)
+ prev_level = SparkContext.broadcast(sc, first_level)
+ levels.append(prev_level)
+ cur_lvl = 1
+ b_topk.value.print_topk()
+ while len(levels[cur_lvl - 1].value) > 0:
+ cur_lvl_res = {}
+ for left in range(int(cur_lvl / 2) + 1):
+ right = cur_lvl - left - 1
+ partitions = sc.parallelize(levels[left].value.values())
+ mapped = partitions.mapPartitions(lambda nodes:
spark_utils.nodes_enum(nodes, levels[right].value.values(),
+
predictions, loss, b_topk.value, alpha, k,
+
w, loss_type, cur_lvl, debug,
+
enumerator))
+ flattened = mapped.flatMap(lambda node: node)
+ partial = flattened.map(lambda node: (node.key, node)).collect()
+ cur_lvl_res.update(partial)
+ prev_level = SparkContext.broadcast(sc, cur_lvl_res)
+ levels.append(prev_level)
+ update_top_k(cur_lvl_res, b_topk.value, alpha, predictions)
+ cur_lvl = cur_lvl + 1
+ top_k.print_topk()
+ print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl -
1].value) * (len(levels[cur_lvl - 1].value) - 1)) +
+ " candidates but after pruning only " +
str(len(prev_level.value)) + " go to the next level")
+ print("Program stopped at level " + str(cur_lvl))
+ print()
+ print("Selected slices are: ")
+ b_topk.value.print_topk()
diff --git a/scripts/staging/slicing/spark_modules/spark_utils.py
b/scripts/staging/slicing/spark_modules/spark_utils.py
new file mode 100644
index 0000000..a12c84b
--- /dev/null
+++ b/scripts/staging/slicing/spark_modules/spark_utils.py
@@ -0,0 +1,141 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+from pyspark.sql.functions import udf
+from pyspark.sql.types import FloatType
+
+from slicing.base.SparkNode import SparkNode
+from slicing.base.slicer import opt_fun, union
+
+calc_loss = udf(lambda target, prediction, type: calc_loss_fun(target,
prediction, type), FloatType())
+model_type_init = udf(lambda type: init_model_type(type))
+
+
+def calc_loss_fun(target, prediction, type):
+ if type == 0:
+ return (prediction - target) ** 2
+ elif type == 1:
+ return float(target == prediction)
+
+
+def init_model_type(model_type):
+ if model_type == "regression":
+ return 0
+ elif model_type == "classification":
+ return 1
+
+
+def approved_join_slice(node_i, node_j, cur_lvl):
+ commons = len(list(set(node_i.attributes) & set(node_j.attributes)))
+ return commons == cur_lvl - 1
+
+
+def make_first_level(features, predictions, loss, top_k, w, loss_type):
+ first_level = []
+ # First level slices are enumerated in a "classic way" (getting data and
not analyzing bounds
+ for feature in features:
+ new_node = SparkNode(loss, predictions)
+ new_node.parents = [feature]
+ new_node.attributes.append(feature)
+ new_node.name = new_node.make_name()
+ new_node.key = new_node.make_key()
+ new_node.process_slice(loss_type)
+ new_node.score = opt_fun(new_node.loss, new_node.size, loss,
len(predictions), w)
+ new_node.c_upper = new_node.score
+ first_level.append(new_node)
+ new_node.print_debug(top_k, 0)
+ return first_level
+
+
+def approved_union_slice(node_i, node_j):
+ for attr1 in node_i.attributes:
+ for attr2 in node_j.attributes:
+ if attr1 == attr2:
+ # there are common attributes which is not the case we need
+ return False
+ return True
+
+
+def process_node(node_i, level, loss, predictions, cur_lvl, top_k, alpha,
loss_type, w, debug, enumerator):
+ cur_enum_nodes = []
+ for node_j in level:
+ if enumerator == "join":
+ flag = approved_join_slice(node_i, node_j, cur_lvl)
+ else:
+ flag = approved_union_slice(node_i, node_j)
+ if flag and int(node_i.name.split("&&")[0]) <
int(node_j.name.split("&&")[0]):
+ new_node = SparkNode(loss, predictions)
+ parents_set = set(new_node.parents)
+ parents_set.add(node_i)
+ parents_set.add(node_j)
+ new_node.parents = list(parents_set)
+ parent1_attr = node_i.attributes
+ parent2_attr = node_j.attributes
+ new_node_attr = union(parent1_attr, parent2_attr)
+ new_node.attributes = new_node_attr
+ new_node.name = new_node.make_name()
+ new_node.key = new_node.make_key()
+ new_node.calc_bounds(cur_lvl, w)
+ to_slice = new_node.check_bounds(top_k, len(predictions), alpha)
+ if to_slice:
+ new_node.process_slice(loss_type)
+ new_node.score = opt_fun(new_node.loss, new_node.size, loss,
len(predictions), w)
+ if new_node.check_constraint(top_k, len(predictions), alpha):
+ cur_enum_nodes.append(new_node)
+ if debug:
+ new_node.print_debug(top_k, cur_lvl)
+ return cur_enum_nodes
+
+
+def nodes_enum(nodes, level, predictions, loss, top_k, alpha, k, w, loss_type,
cur_lvl, debug, enumerator):
+ cur_enum_nodes = []
+ for node_i in nodes:
+ partial_nodes = process_node(node_i, level, loss, predictions,
cur_lvl, top_k, alpha,
+ loss_type, w, debug, enumerator)
+ cur_enum_nodes.append(partial_nodes)
+ return cur_enum_nodes
+
+
+def init_top_k(first_level, top_k, alpha, predictions):
+ # driver updates topK
+ for sliced in first_level:
+ if sliced[1].check_constraint(top_k, len(predictions), alpha):
+ # this method updates top k slices if needed
+ top_k.add_new_top_slice(sliced[1])
+
+
+def update_top_k(new_slices, top_k, alpha, predictions):
+ # driver updates topK
+ for sliced in new_slices.values():
+ if sliced.check_constraint(top_k, len(predictions), alpha):
+ # this method updates top k slices if needed
+ top_k.add_new_top_slice(sliced)
+
+
+def calc_bucket_metrics(bucket, loss, w, x_size, cur_lvl):
+ bucket.calc_error()
+ bucket.score = opt_fun(bucket.error, bucket.size, loss, x_size, w)
+ if cur_lvl == 0:
+ bucket.s_upper = bucket.size
+ bucket.c_upper = bucket.score
+ bucket.s_lower = 1
+ return bucket
diff --git a/scripts/staging/slicing/spark_modules/union_data_parallel.py
b/scripts/staging/slicing/spark_modules/union_data_parallel.py
new file mode 100644
index 0000000..b000abe
--- /dev/null
+++ b/scripts/staging/slicing/spark_modules/union_data_parallel.py
@@ -0,0 +1,119 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from pyspark import SparkContext
+
+from slicing.base.Bucket import Bucket
+from slicing.base.SparkNode import SparkNode
+from slicing.base.top_k import Topk
+from slicing.spark_modules import spark_utils, join_data_parallel
+from slicing.spark_modules.join_data_parallel import rows_mapper, combiner
+from slicing.spark_modules.spark_utils import approved_union_slice
+
+
+def merge_values(a, b):
+ a.minimize_bounds(b)
+ return a
+
+
+def merge_combiners(a, b):
+ a + b
+ return a
+
+
+def union_enum(left_level, right_level, x_size, alpha, top_k, w, loss,
cur_lvl):
+ buckets = {}
+ for node_i in range(len(left_level)):
+ for node_j in range(len(right_level)):
+ flag = approved_union_slice(left_level[node_i],
right_level[node_j])
+ if flag:
+ node = SparkNode(None, None)
+ node.attributes = list(set(left_level[node_i].attributes) |
set(right_level[node_j].attributes))
+ bucket = Bucket(node, cur_lvl, w, x_size, loss)
+ bucket.parents.append(left_level[node_i])
+ bucket.parents.append(right_level[node_j])
+ bucket.calc_bounds(w, x_size, loss)
+ if bucket.check_bounds(x_size, alpha, top_k):
+ buckets[bucket.name] = bucket
+ return buckets
+
+
+def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w,
loss_type):
+ top_k = Topk(k)
+ cur_lvl = 0
+ levels = []
+ cur_lvl_nodes = list(all_features)
+ pred_pandas = predictions.toPandas()
+ x_size = len(pred_pandas)
+ b_topk = SparkContext.broadcast(sc, top_k)
+ b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
+ buckets = {}
+ for node in cur_lvl_nodes:
+ bucket = Bucket(node, b_cur_lvl.value, w, x_size, loss)
+ buckets[bucket.name] = bucket
+ b_buckets = SparkContext.broadcast(sc, buckets)
+ rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2])) \
+ .map(lambda item: (item[0], item[1].tolist(), item[2]))
+ mapped = rows.map(lambda row: rows_mapper(row, b_buckets.value, loss_type))
+ flattened = mapped.flatMap(lambda line: (line.items()))
+ reduced = flattened.combineByKey(combiner,
join_data_parallel.merge_values, join_data_parallel.merge_combiners)
+ cur_lvl_nodes = reduced.values() \
+ .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket, loss, w,
x_size, b_cur_lvl.value))
+ if debug:
+ cur_lvl_nodes.map(lambda bucket:
bucket.print_debug(b_topk.value)).collect()
+ cur_lvl = 1
+ prev_level = cur_lvl_nodes.collect()
+ b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
+ levels.append(b_cur_lvl_nodes)
+ top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
+ while len(levels[cur_lvl - 1].value) > 0:
+ b_topk = SparkContext.broadcast(sc, top_k)
+ b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
+ b_topk.value.print_topk()
+ buckets = []
+ for left in range(int(cur_lvl / 2) + 1):
+ right = cur_lvl - left - 1
+ nodes = union_enum(levels[left].value, levels[right].value,
x_size, alpha, b_topk.value, w, loss, b_cur_lvl.value)
+ buckets.append(nodes)
+ b_buckets = sc.parallelize(buckets)
+ all_buckets = b_buckets.flatMap(lambda line: (line.items()))
+ combined = dict(all_buckets.combineByKey(combiner, merge_values,
merge_combiners).collect())
+ b_buckets = SparkContext.broadcast(sc, combined)
+ to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size,
alpha, b_topk.value), b_buckets.value.items()))
+ mapped = rows.map(lambda row: rows_mapper(row, to_slice, loss_type))
+ flattened = mapped.flatMap(lambda line: (line.items()))
+ partial = flattened.combineByKey(combiner,
join_data_parallel.merge_values, join_data_parallel.merge_combiners)
+ prev_level = partial\
+ .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1],
loss, w, x_size, b_cur_lvl.value)).collect()
+ if debug:
+ partial.values().map(lambda bucket:
bucket.print_debug(b_topk.value)).collect()
+ top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
+ print("Level " + str(cur_lvl) + " had " + str(
+ len(levels[cur_lvl - 1].value) * (len(levels[cur_lvl - 1].value) -
1)) +
+ " candidates but after pruning only " + str(len(prev_level)) + "
go to the next level")
+ print("Program stopped at level " + str(cur_lvl))
+ b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
+ levels.append(b_cur_lvl_nodes)
+ cur_lvl = b_cur_lvl.value + 1
+ print()
+ print("Selected slices are: ")
+ b_topk.value.print_topk()
+ return None
diff --git a/scripts/staging/slicing/tests/__init__.py
b/scripts/staging/slicing/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/scripts/staging/slicing/tests/classification/__init__.py
b/scripts/staging/slicing/tests/classification/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/scripts/staging/slicing/tests/classification/sparked_adults.py
b/scripts/staging/slicing/tests/classification/sparked_adults.py
new file mode 100644
index 0000000..9c37fef
--- /dev/null
+++ b/scripts/staging/slicing/tests/classification/sparked_adults.py
@@ -0,0 +1,118 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from pyspark import SparkConf, SparkContext
+from pyspark.ml import Pipeline
+from pyspark.ml.classification import RandomForestClassifier
+from pyspark.ml.evaluation import MulticlassClassificationEvaluator
+from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator,
VectorAssembler
+from pyspark.sql import SQLContext
+import pyspark.sql.functions as sf
+
+from slicing.sparked import sparked_utils, sparked_slicer, sparked_union_slicer
+
+
+ten_binner = sf.udf(lambda arg: int(arg / 10))
+fnlwgt_binner = sf.udf(lambda arg: int(arg // 100000))
+edu_binner = sf.udf(lambda arg: int(arg / 5))
+cap_gain_binner = sf.udf(lambda arg: int(arg / 1000))
+
+conf = SparkConf().setAppName("adults_test").setMaster('local[2]')
+num_partitions = 2
+model_type = "classification"
+label = 'Income'
+sparkContext = SparkContext(conf=conf)
+sqlContext = SQLContext(sparkContext)
+dataset_df = sqlContext.read.csv('/slicing/datasets/adult.csv', header='true',
inferSchema='true')
+# initializing stages of main transformation pipeline
+stages = []
+dataset_df = dataset_df.withColumn('age_bin', ten_binner(dataset_df['Age']))
+dataset_df = dataset_df.withColumn('fnlwgt_bin',
fnlwgt_binner(dataset_df['fnlwgt']))
+dataset_df = dataset_df.withColumn('edu_num_bin',
edu_binner(dataset_df['EducationNum']))
+dataset_df = dataset_df.withColumn('cap_gain_bin',
cap_gain_binner(dataset_df['CapitalGain']))
+dataset_df = dataset_df.withColumn('hours_per_w_bin',
ten_binner(dataset_df['HoursPerWeek']))
+dataset_df = dataset_df.withColumn('model_type', sf.lit(1))
+
+# list of categorical features for further hot-encoding
+cat_features = ["age_bin", "WorkClass", "fnlwgt_bin", "Education",
"edu_num_bin",
+ "MaritalStatus", "Occupation", "Relationship", "Race", "Gender",
+ "cap_gain_bin", "CapitalLoss", "hours_per_w_bin", "NativeCountry"]
+
+# hot encoding categorical features
+for feature in cat_features:
+ string_indexer = StringIndexer(inputCol=feature, outputCol=feature +
"_index")
+ encoder =
OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()],
outputCols=[feature + "_vec"])
+ encoder.setDropLast(False)
+ stages += [string_indexer, encoder]
+assembler_inputs = [feature + "_vec" for feature in cat_features]
+assembler = VectorAssembler(inputCols=assembler_inputs,
outputCol="assembled_inputs")
+stages += [assembler]
+assembler_final = VectorAssembler(inputCols=["assembled_inputs"],
outputCol="features")
+label_indexer = StringIndexer(inputCol=label, outputCol=label+"_idx")
+stages += [assembler_final]
+stages += [label_indexer]
+pipeline = Pipeline(stages=stages)
+pipeline_model = pipeline.fit(dataset_df)
+dataset_transformed = pipeline_model.transform(dataset_df)
+cat_dict = []
+decode_dict = {}
+counter = 0
+cat = 0
+for feature in cat_features:
+ colIdx = dataset_transformed.select(feature, feature +
"_index").distinct().rdd.collectAsMap()
+ colIdx = {k: v for k, v in sorted(colIdx.items(), key=lambda item:
item[1])}
+ for item in colIdx:
+ decode_dict[counter] = (cat, item, colIdx[item])
+ counter = counter + 1
+ cat = cat + 1
+df_transform_fin = dataset_transformed.select('features', label+"_idx",
'model_type')
+splits = df_transform_fin.randomSplit([0.8, 0.2], seed=1234)
+train_df = splits[0]
+test_df = splits[1]
+rf = RandomForestClassifier(featuresCol='features', labelCol=label+"_idx",
numTrees=10)
+rf_model = rf.fit(train_df)
+predictions = rf_model.transform(test_df)
+# Select example rows to display.
+predictions.select("features", label+"_idx", "prediction", "model_type")
+# Select (prediction, true label) and compute test error
+evaluator = MulticlassClassificationEvaluator(
+ labelCol=label+"_idx", predictionCol="prediction", metricName="accuracy")
+accuracy = evaluator.evaluate(predictions)
+loss = 1.0 - accuracy
+pred_df_fin = predictions.withColumn('error',
sparked_utils.calc_loss(predictions[label+"_idx"],
+
predictions['prediction'],
+
predictions['model_type']))
+predictions = pred_df_fin.select('features',
'error').repartition(num_partitions)
+all_features = range(predictions.toPandas().values[0][0].size)
+predictions = predictions.collect()
+k = 10
+SparkContext.broadcast(sparkContext, loss)
+SparkContext.broadcast(sparkContext, predictions)
+SparkContext.broadcast(sparkContext, all_features)
+SparkContext.broadcast(sparkContext, decode_dict)
+enumerator = "join"
+if enumerator == "join":
+ sparked_slicer.parallel_process(all_features, predictions, loss,
sparkContext, debug=True, alpha=6, k=k, w=0.5,
+ loss_type=0, enumerator="join")
+elif enumerator == "union":
+ sparked_union_slicer.process(all_features, predictions, loss,
sparkContext, debug=True, alpha=4, k=k, w=0.5, loss_type=0,
+ enumerator="union")
+
diff --git a/scripts/staging/slicing/tests/classification/test_adult.py
b/scripts/staging/slicing/tests/classification/test_adult.py
new file mode 100644
index 0000000..9575592
--- /dev/null
+++ b/scripts/staging/slicing/tests/classification/test_adult.py
@@ -0,0 +1,121 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import sys
+
+import pandas as pd
+from sklearn.preprocessing import OneHotEncoder
+
+from slicing.base import slicer as slicer, union_slicer
+from sklearn.ensemble import RandomForestClassifier
+from sklearn import preprocessing
+from sklearn.model_selection import train_test_split
+
+
+if __name__ == "__main__":
+ args = sys.argv
+ if len(args) > 1:
+ k = int(args[1])
+ w = float(args[2].replace(',', '.'))
+ alpha = int(args[3])
+ if args[4] == "True":
+ b_update = True
+ else:
+ b_update = False
+ debug = args[5]
+ loss_type = int(args[6])
+ enumerator = args[7]
+ else:
+ k = 10
+ w = 0.5
+ alpha = 4
+ b_update = True
+ debug = True
+ loss_type = 1
+ enumerator = "union"
+ dataset = pd.read_csv('/slicing/datasets/adult.csv')
+ attributes_amount = len(dataset.values[0])
+ x = dataset.iloc[:, 0:attributes_amount - 1].values
+ y = dataset.iloc[:, attributes_amount - 1]
+ le = preprocessing.LabelEncoder()
+ le.fit(y)
+ y = le.transform(y)
+ complete_x = []
+ complete_y = []
+ counter = 0
+ all_indexes = []
+ not_encoded_columns = [
+ "Age", "WorkClass", "fnlwgt", "Education", "EducationNum",
+ "MaritalStatus", "Occupation", "Relationship", "Race", "Gender",
+ "CapitalGain", "CapitalLoss", "HoursPerWeek", "NativeCountry", "Income"
+ ]
+ for row in x:
+ row[0] = int(row[0] / 10)
+ row[2] = int(row[2]) // 100000
+ row[4] = int(row[4] / 5)
+ row[10] = int(row[10] / 1000)
+ row[12] = int(row[12] / 10)
+ enc = OneHotEncoder(handle_unknown='ignore')
+ x = enc.fit_transform(x).toarray()
+ all_features = enc.get_feature_names()
+ x_size = len(complete_x)
+ x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2,
random_state=0)
+ for item in x_test:
+ complete_x.append((counter, item))
+ complete_y.append((counter, y_test[counter]))
+ counter = counter + 1
+ x_size = counter
+ clf = RandomForestClassifier(n_jobs=2, random_state=0)
+ clf.fit(x_train, y_train)
+ RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
+ max_depth=None, max_features='auto', max_leaf_nodes=None,
+ min_impurity_split=1e-07, min_samples_leaf=1,
+ min_samples_split=2, min_weight_fraction_leaf=0.0,
+ n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
+ verbose=0, warm_start=False)
+
+ # alpha is size significance coefficient
+ # verbose option is for returning debug info while creating slices and
printing it
+ # k is number of top-slices we want
+ # w is a weight of error function significance (1 - w) is a size
significance propagated into optimization function
+ # loss_type = 0 (l2 in case of regression model
+ # loss_type = 1 (cross entropy in case of classification model)
+ preds = clf.predict(x_test)
+ predictions = []
+ counter = 0
+ mistakes = 0
+ for pred in preds:
+ predictions.append((counter, pred))
+ if y_test[counter] != pred:
+ mistakes = mistakes + 1
+ counter = counter + 1
+ lossF = mistakes / counter
+
+ # enumerator <union>/<join> indicates an approach of next level slices
combination process:
+ # in case of <join> in order to create new node of current level slicer
+ # combines only nodes of previous layer with each other
+ # <union> case implementation is based on DPSize algorithm
+ if enumerator == "join":
+ slicer.process(all_features, complete_x, lossF, x_size, complete_y,
predictions, debug=debug, alpha=alpha, k=k,
+ w=w, loss_type=loss_type, b_update=b_update)
+ elif enumerator == "union":
+ union_slicer.process(all_features, complete_x, lossF, x_size,
complete_y, predictions, debug=debug, alpha=alpha,
+ k=k, w=w, loss_type=loss_type, b_update=b_update)
diff --git a/scripts/staging/slicing/tests/classification/test_iris.py
b/scripts/staging/slicing/tests/classification/test_iris.py
new file mode 100644
index 0000000..2bd0c09
--- /dev/null
+++ b/scripts/staging/slicing/tests/classification/test_iris.py
@@ -0,0 +1,109 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import sys
+
+import pandas as pd
+from sklearn.preprocessing import OneHotEncoder
+
+from slicing.base import slicer as slicer, union_slicer
+from sklearn.ensemble import RandomForestClassifier
+from sklearn import preprocessing
+from sklearn.model_selection import train_test_split
+
+if __name__ == "__main__":
+ args = sys.argv
+ if len(args) > 1:
+ k = int(args[1])
+ w = float(args[2].replace(',', '.'))
+ alpha = int(args[3])
+ if args[4] == "True":
+ b_update = True
+ else:
+ b_update = False
+ debug = args[5]
+ loss_type = int(args[6])
+ enumerator = args[7]
+ else:
+ k = 10
+ w = 0.5
+ alpha = 6
+ b_update = True
+ debug = True
+ loss_type = 1
+ enumerator = "join"
+ dataset = pd.read_csv('/slicing/datasets/iris.csv')
+ attributes_amount = len(dataset.values[0])
+ x = dataset.iloc[:, 0:attributes_amount - 1].values
+ enc = OneHotEncoder(handle_unknown='ignore')
+ x = enc.fit_transform(x).toarray()
+ y = dataset.iloc[:, attributes_amount - 1]
+ le = preprocessing.LabelEncoder()
+ le.fit(y)
+ y = le.transform(y)
+ complete_x = []
+ complete_y = []
+ counter = 0
+ all_indexes = []
+ all_features = enc.get_feature_names()
+ x_size = len(complete_x)
+ x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3,
random_state=0)
+ for item in x_test:
+ complete_x.append((counter, item))
+ complete_y.append((counter, y_test[counter]))
+ counter = counter + 1
+ x_size = counter
+ clf = RandomForestClassifier(n_jobs=2, random_state=0)
+ clf.fit(x_train, y_train)
+ RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
+ max_depth=None, max_features='auto', max_leaf_nodes=None,
+ min_impurity_split=1e-07, min_samples_leaf=1,
+ min_samples_split=2, min_weight_fraction_leaf=0.0,
+ n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
+ verbose=0, warm_start=False)
+
+ # alpha is size significance coefficient
+ # verbose option is for returning debug info while creating slices and
printing it
+ # k is number of top-slices we want
+ # w is a weight of error function significance (1 - w) is a size
significance propagated into optimization function
+ # loss_type = 0 (l2 in case of regression model
+ # loss_type = 1 (cross entropy in case of classification model)
+ preds = clf.predict(x_test)
+ predictions = []
+ counter = 0
+ mistakes = 0
+ for pred in preds:
+ predictions.append((counter, pred))
+ if y_test[counter] != pred:
+ mistakes = mistakes + 1
+ counter = counter + 1
+ lossF = mistakes / counter
+
+ # enumerator <union>/<join> indicates an approach of next level slices
combination process:
+ # in case of <join> in order to create new node of current level slicer
+ # combines only nodes of previous layer with each other
+ # <union> case implementation is based on DPSize algorithm
+ if enumerator == "join":
+ slicer.process(all_features, complete_x, lossF, x_size, complete_y,
predictions, debug=debug, alpha=alpha, k=k,
+ w=w, loss_type=1, b_update=b_update)
+ elif enumerator == "union":
+ union_slicer.process(all_features, complete_x, lossF, x_size,
complete_y, predictions, debug=debug, alpha=alpha, k=k,
+ w=w, loss_type=loss_type, b_update=b_update)
diff --git a/scripts/staging/slicing/tests/regression/__init__.py
b/scripts/staging/slicing/tests/regression/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/scripts/staging/slicing/tests/regression/bd_spark_salary.py
b/scripts/staging/slicing/tests/regression/bd_spark_salary.py
new file mode 100644
index 0000000..c32414b
--- /dev/null
+++ b/scripts/staging/slicing/tests/regression/bd_spark_salary.py
@@ -0,0 +1,131 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import sys
+
+from pyspark import SparkConf, SparkContext
+from pyspark.ml import Pipeline
+from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator,
VectorAssembler, IndexToString
+from pyspark.ml.regression import LinearRegression
+from pyspark.sql import SQLContext
+import pyspark.sql.functions as sf
+from pyspark.sql.functions import udf
+from sklearn.model_selection import train_test_split
+
+from slicing.spark_modules import spark_utils, join_data_parallel,
union_data_parallel
+
+binner = udf(lambda arg: int(int(arg) // 5))
+
+
+if __name__ == "__main__":
+ args = sys.argv
+ if len(args) > 1:
+ k = int(args[1])
+ w = float(args[2].replace(',', '.'))
+ alpha = int(args[3])
+ if args[4] == "True":
+ b_update = True
+ else:
+ b_update = False
+ debug = args[5]
+ loss_type = int(args[6])
+ enumerator = args[7]
+ else:
+ k = 10
+ w = 0.5
+ alpha = 6
+ b_update = True
+ debug = True
+ loss_type = 0
+ enumerator = "union"
+
+ conf = SparkConf().setAppName("salary_test").setMaster('local[2]')
+ num_partitions = 2
+ model_type = "regression"
+ label = 'salary'
+ sparkContext = SparkContext(conf=conf)
+ sqlContext = SQLContext(sparkContext)
+ fileRDD = sparkContext.textFile('salaries.csv', num_partitions)
+ header = fileRDD.first()
+ head_split = header.split(",")
+ head_split[0] = '_c0'
+ fileRDD = fileRDD.filter(lambda line: line != header)
+ data = fileRDD.map(lambda row: row.split(","))
+ dataset_df = sqlContext.createDataFrame(data, head_split)
+
+ cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"]
+ # initializing stages of main transformation pipeline
+ stages = []
+ dataset_df = dataset_df.drop('_c0')
+ dataset_df = dataset_df.withColumn("id", sf.monotonically_increasing_id())
+ # bining numeric features by local binner udf function (specified for
current dataset if needed)
+ dataset_df = dataset_df.withColumn('sincephd_bin',
binner(dataset_df['sincephd']))
+ dataset_df = dataset_df.withColumn('service_bin',
binner(dataset_df['service']))
+ dataset_df = dataset_df.withColumn('model_type', sf.lit(0))
+ dataset_df = dataset_df.drop('sincephd', 'service')
+ dataset_df = dataset_df.withColumn('target', dataset_df[label].cast("int"))
+ # hot encoding categorical features
+ for feature in cat_features:
+ string_indexer = StringIndexer(inputCol=feature, outputCol=feature +
"_index")
+ encoder =
OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()],
outputCols=[feature + "_vec"])
+ encoder.setDropLast(False)
+ stages += [string_indexer, encoder]
+ assembler_inputs = [feature + "_vec" for feature in cat_features]
+ assembler = VectorAssembler(inputCols=assembler_inputs,
outputCol="assembled_inputs")
+ stages += [assembler]
+ assembler_final = VectorAssembler(inputCols=["assembled_inputs"],
outputCol="features")
+ stages += [assembler_final]
+
+ pipeline = Pipeline(stages=stages)
+ pipeline_model = pipeline.fit(dataset_df)
+ dataset_transformed = pipeline_model.transform(dataset_df)
+ df_transform_fin = dataset_transformed.select('id', 'features', 'target',
'model_type').toPandas()
+
+ cat = 0
+ counter = 0
+ decode_dict = {}
+ for feature in cat_features:
+ colIdx = dataset_transformed.select(feature, feature +
"_index").distinct().rdd.collectAsMap()
+ colIdx = {k: v for k, v in sorted(colIdx.items(), key=lambda item:
item[1])}
+ for item in colIdx:
+ decode_dict[counter] = (cat, item, colIdx[item])
+ counter = counter + 1
+ cat = cat + 1
+
+ train, test = train_test_split(df_transform_fin, test_size=0.3,
random_state=0)
+ train_df = sqlContext.createDataFrame(train)
+ test_df = sqlContext.createDataFrame(test)
+ lr = LinearRegression(featuresCol='features', labelCol='target',
maxIter=10, regParam=0.3, elasticNetParam=0.8)
+ lr_model = lr.fit(train_df)
+ eval = lr_model.evaluate(test_df)
+ f_l2 = eval.meanSquaredError
+ pred = eval.predictions
+ pred_df_fin = pred.withColumn('error',
spark_utils.calc_loss(pred['target'], pred['prediction'], pred['model_type']))
+ predictions = pred_df_fin.select('id', 'features',
'error').repartition(num_partitions)
+ converter = IndexToString(inputCol='features', outputCol='cats')
+ all_features = range(predictions.toPandas().values[1][1].size)
+ k = 10
+ if enumerator == "join":
+ join_data_parallel.parallel_process(all_features, predictions, f_l2,
sparkContext, debug=debug, alpha=alpha,
+ k=k, w=w, loss_type=loss_type)
+ elif enumerator == "union":
+ union_data_parallel.parallel_process(all_features, predictions, f_l2,
sparkContext, debug=debug, alpha=alpha,
+ k=k, w=w, loss_type=loss_type)
diff --git a/scripts/staging/slicing/tests/regression/spark_salary.py
b/scripts/staging/slicing/tests/regression/spark_salary.py
new file mode 100644
index 0000000..52d0cf2
--- /dev/null
+++ b/scripts/staging/slicing/tests/regression/spark_salary.py
@@ -0,0 +1,123 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import sys
+
+from pyspark import SparkConf, SparkContext
+from pyspark.ml import Pipeline
+from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator,
VectorAssembler, IndexToString
+from pyspark.ml.regression import LinearRegression
+from pyspark.sql import SQLContext
+import pyspark.sql.functions as sf
+from pyspark.sql.functions import udf
+from sklearn.model_selection import train_test_split
+
+from slicing.spark_modules import spark_utils, spark_slicer, spark_union_slicer
+
+
+binner = udf(lambda arg: int(arg / 5))
+
+
+if __name__ == "__main__":
+ args = sys.argv
+ if len(args) > 1:
+ k = int(args[1])
+ w = float(args[2].replace(',', '.'))
+ alpha = int(args[3])
+ if args[4] == "True":
+ b_update = True
+ else:
+ b_update = False
+ debug = args[5]
+ loss_type = int(args[6])
+ enumerator = args[7]
+ else:
+ k = 10
+ w = 0.5
+ alpha = 6
+ b_update = True
+ debug = True
+ loss_type = 0
+ enumerator = "join"
+
+ conf = SparkConf().setAppName("salary_test").setMaster('local[2]')
+ num_partitions = 2
+ model_type = "regression"
+ label = 'salary'
+ sparkContext = SparkContext(conf=conf)
+ sqlContext = SQLContext(sparkContext)
+ dataset_df = sqlContext.read.csv('salaries.csv', header='true',
inferSchema='true')
+ # initializing stages of main transformation pipeline
+ stages = []
+ # list of categorical features for further hot-encoding
+ cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"]
+ # removing column with ID field
+ dataset_df = dataset_df.drop('_c0')
+ # bining numeric features by local binner udf function (specified for
current dataset if needed)
+ dataset_df = dataset_df.withColumn('sincephd_bin',
binner(dataset_df['sincephd']))
+ dataset_df = dataset_df.withColumn('service_bin',
binner(dataset_df['service']))
+ dataset_df = dataset_df.withColumn('model_type', sf.lit(0))
+ dataset_df = dataset_df.drop('sincephd', 'service')
+ # hot encoding categorical features
+ for feature in cat_features:
+ string_indexer = StringIndexer(inputCol=feature, outputCol=feature +
"_index")
+ encoder =
OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()],
outputCols=[feature + "_vec"])
+ encoder.setDropLast(False)
+ stages += [string_indexer, encoder]
+ assembler_inputs = [feature + "_vec" for feature in cat_features]
+ assembler = VectorAssembler(inputCols=assembler_inputs,
outputCol="assembled_inputs")
+ stages += [assembler]
+ assembler_final = VectorAssembler(inputCols=["assembled_inputs"],
outputCol="features")
+ stages += [assembler_final]
+ pipeline = Pipeline(stages=stages)
+ pipeline_model = pipeline.fit(dataset_df)
+ dataset_transformed = pipeline_model.transform(dataset_df)
+ df_transform_fin = dataset_transformed.select('features', label,
'model_type').toPandas()
+ train, test = train_test_split(df_transform_fin, test_size=0.3,
random_state=0)
+ train_df = sqlContext.createDataFrame(train)
+ test_df = sqlContext.createDataFrame(test)
+ decode_dict = {}
+ counter = 0
+ cat = 0
+ for feature in cat_features:
+ colIdx = dataset_transformed.select(feature, feature +
"_index").distinct().rdd.collectAsMap()
+ colIdx = {k: v for k, v in sorted(colIdx.items(), key=lambda item:
item[1])}
+ for item in colIdx:
+ decode_dict[counter] = (cat, item, colIdx[item])
+ counter = counter + 1
+ cat = cat + 1
+ lr = LinearRegression(featuresCol='features', labelCol=label, maxIter=10,
regParam=0.3, elasticNetParam=0.8)
+ lr_model = lr.fit(train_df)
+ eval = lr_model.evaluate(test_df)
+ f_l2 = eval.meanSquaredError
+ pred = eval.predictions
+ pred_df_fin = pred.withColumn('error', spark_utils.calc_loss(pred[label],
pred['prediction'], pred['model_type']))
+ predictions = pred_df_fin.select('features',
'error').repartition(num_partitions)
+ converter = IndexToString(inputCol='features', outputCol='cats')
+ all_features = range(predictions.toPandas().values[0][0].size)
+ predictions = predictions.collect()
+ k = 10
+ if enumerator == "join":
+ spark_slicer.parallel_process(all_features, predictions, f_l2,
sparkContext, debug=debug, alpha=alpha, k=k, w=w,
+ loss_type=loss_type,
enumerator=enumerator)
+ elif enumerator == "union":
+ spark_union_slicer.process(all_features, predictions, f_l2,
sparkContext, debug=debug, alpha=alpha, k=k, w=w,
+ loss_type=loss_type, enumerator=enumerator)
diff --git a/scripts/staging/slicing/tests/regression/test_insurance.py
b/scripts/staging/slicing/tests/regression/test_insurance.py
new file mode 100644
index 0000000..64b58c4
--- /dev/null
+++ b/scripts/staging/slicing/tests/regression/test_insurance.py
@@ -0,0 +1,103 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import sys
+
+import pandas as pd
+from sklearn.linear_model import LinearRegression
+from sklearn.model_selection import train_test_split
+from sklearn.preprocessing import OneHotEncoder
+
+from slicing.base import slicer as slicer, union_slicer
+
+if __name__ == "__main__":
+ args = sys.argv
+ if len(args) > 1:
+ k = int(args[1])
+ w = float(args[2].replace(',', '.'))
+ alpha = int(args[3])
+ if args[4] == "True":
+ b_update = True
+ else:
+ b_update = False
+ debug = args[5]
+ loss_type = int(args[6])
+ enumerator = args[7]
+ else:
+ k = 10
+ w = 0.5
+ alpha = 4
+ b_update = True
+ debug = True
+ loss_type = 0
+ enumerator = "union"
+ file_name = '/slicing/datasets/insurance.csv'
+ dataset = pd.read_csv(file_name)
+ attributes_amount = len(dataset.values[0])
+ # for now working with regression datasets, assuming that target attribute
is the last one
+ # currently non-categorical features are not supported and should be binned
+ y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values
+ # starting with one not including id field
+ x = dataset.iloc[:, 0:attributes_amount - 1].values
+ # list of numerical columns
+ non_categorical = [1, 3]
+ for row in x:
+ for attribute in non_categorical:
+ # <attribute - 2> as we already excluded from x id column
+ row[attribute - 1] = int(row[attribute - 1] / 5)
+ # hot encoding of categorical features
+ enc = OneHotEncoder(handle_unknown='ignore')
+ x = enc.fit_transform(x).toarray()
+ complete_x = []
+ complete_y = []
+ counter = 0
+ all_features = enc.get_feature_names()
+ # train model on a whole dataset
+ x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3,
random_state=0)
+ for item in x_test:
+ complete_x.append((counter, item))
+ complete_y.append((counter, y_test[counter]))
+ counter = counter + 1
+ x_size = counter
+ model = LinearRegression()
+ model.fit(x_train, y_train)
+ preds = (model.predict(x_test) - y_test) ** 2
+ f_l2 = sum(preds)/x_size
+ errors = []
+ counter = 0
+ for pred in preds:
+ errors.append((counter, pred))
+ counter = counter + 1
+ # alpha is size significance coefficient
+ # verbose option is for returning debug info while creating slices and
printing it
+ # k is number of top-slices we want
+ # w is a weight of error function significance (1 - w) is a size
significance propagated into optimization function
+
+ # enumerator <union>/<join> indicates an approach of next level slices
combination process:
+ # in case of <join> in order to create new node of current level slicer
+ # combines only nodes of previous layer with each other
+ # <union> case implementation is based on DPSize algorithm
+ if enumerator == "join":
+ slicer.process(all_features, complete_x, f_l2, x_size, y_test, errors,
debug=debug, alpha=alpha, k=k,
+ w=w, loss_type=loss_type, b_update=b_update)
+ elif enumerator == "union":
+ union_slicer.process(all_features, complete_x, f_l2, x_size, y_test,
errors, debug=debug, alpha=alpha, k=k,
+ w=w, loss_type=loss_type, b_update=b_update)
diff --git a/scripts/staging/slicing/tests/regression/test_salary.py
b/scripts/staging/slicing/tests/regression/test_salary.py
new file mode 100644
index 0000000..92e0055
--- /dev/null
+++ b/scripts/staging/slicing/tests/regression/test_salary.py
@@ -0,0 +1,104 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import pandas as pd
+from sklearn.linear_model import LinearRegression
+from sklearn.metrics import mean_squared_error
+from sklearn.model_selection import train_test_split
+from sklearn.preprocessing import OneHotEncoder
+import sys
+
+from slicing.base import slicer, union_slicer
+
+if __name__ == "__main__":
+ args = sys.argv
+ if len(args) > 1:
+ k = int(args[1])
+ w = float(args[2].replace(',', '.'))
+ alpha = int(args[3])
+ if args[4] == "True":
+ b_update = True
+ else:
+ b_update = False
+ debug = args[5]
+ loss_type = int(args[6])
+ enumerator = args[7]
+ else:
+ k = 10
+ w = 0.5
+ alpha = 4
+ b_update = True
+ debug = True
+ loss_type = 0
+ enumerator = "union"
+ file_name = '/slicing/datasets/salaries.csv'
+ dataset = pd.read_csv(file_name)
+ attributes_amount = len(dataset.values[0])
+ # for now working with regression datasets, assuming that target attribute
is the last one
+ # currently non-categorical features are not supported and should be binned
+ y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values
+ # starting with one not including id field
+ x = dataset.iloc[:, 1:attributes_amount - 1].values
+ # list of numerical columns
+ non_categorical = [4, 5]
+ for row in x:
+ for attribute in non_categorical:
+ # <attribute - 2> as we already excluded from x id column
+ row[attribute - 2] = int(row[attribute - 2] / 5)
+ # hot encoding of categorical features
+ enc = OneHotEncoder(handle_unknown='ignore')
+ x = enc.fit_transform(x).toarray()
+ complete_x = []
+ complete_y = []
+ counter = 0
+ all_features = enc.get_feature_names()
+ # train model on a whole dataset
+ x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3,
random_state=0)
+ for item in x_test:
+ complete_x.append((counter, item))
+ complete_y.append((counter, y_test[counter]))
+ counter = counter + 1
+ x_size = counter
+ model = LinearRegression()
+ model.fit(x_train, y_train)
+ predictions = model.predict(x_test)
+ f_l2 = mean_squared_error(y_test, predictions)
+ preds = (model.predict(x_test) - y_test) ** 2
+ errors = []
+ counter = 0
+ for pred in preds:
+ errors.append((counter, pred))
+ counter = counter + 1
+ # alpha is size significance coefficient
+ # verbose option is for returning debug info while creating slices and
printing it
+ # k is number of top-slices we want
+ # w is a weight of error function significance (1 - w) is a size
significance propagated into optimization function
+
+ # enumerator <union>/<join> indicates an approach of next level slices
combination process:
+ # in case of <join> in order to create new node of current level slicer
+ # combines only nodes of previous layer with each other
+ # <union> case implementation is based on DPSize algorithm
+ if enumerator == "join":
+ slicer.process(all_features, complete_x, f_l2, x_size, y_test, errors,
debug=debug, alpha=alpha, k=k, w=w,
+ loss_type=loss_type, b_update=b_update)
+ elif enumerator == "union":
+ union_slicer.process(all_features, complete_x, f_l2, x_size, y_test,
errors, debug=debug, alpha=alpha, k=k, w=w,
+ loss_type=loss_type, b_update=b_update)