This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git
The following commit(s) were added to refs/heads/master by this push:
new e78962b [SYSTEMDS-254] Fixes distributed slice finding implementation
e78962b is described below
commit e78962b8db5b1c90fdb37ae6d9c6284f744cdbfc
Author: gilgenbergg <[email protected]>
AuthorDate: Sat May 23 23:24:30 2020 +0200
[SYSTEMDS-254] Fixes distributed slice finding implementation
Closes #908.
---
docs/Tasks.txt | 1 +
scripts/staging/slicing/base/Bucket.py | 2 +
scripts/staging/slicing/base/SparkNode.py | 70 +++++---------------
scripts/staging/slicing/base/__init__.py | 4 +-
scripts/staging/slicing/base/node.py | 74 ++++++++++++----------
scripts/staging/slicing/base/slicer.py | 12 ++--
scripts/staging/slicing/base/union_slicer.py | 19 ++----
.../slicing/spark_modules/join_data_parallel.py | 24 +++----
.../staging/slicing/spark_modules/spark_slicer.py | 9 +--
.../slicing/spark_modules/spark_union_slicer.py | 9 +--
.../staging/slicing/spark_modules/spark_utils.py | 15 ++---
.../slicing/spark_modules/union_data_parallel.py | 22 ++++---
.../slicing/tests/classification/__init__.py | 4 +-
.../staging/slicing/tests/regression/__init__.py | 4 +-
14 files changed, 117 insertions(+), 152 deletions(-)
diff --git a/docs/Tasks.txt b/docs/Tasks.txt
index 7b64145..1196566 100644
--- a/docs/Tasks.txt
+++ b/docs/Tasks.txt
@@ -209,6 +209,7 @@ SYSTEMDS-250 Extended Slice Finding
* 251 Alternative slice enumeration approach OK
* 252 Initial data slicing implementation Python OK
* 253 Distributed slicing algorithms (task/data parallel) OK
+ * 254 Consolidation and fixes distributed slice finding OK
SYSTEMDS-260 Misc Tools
* 261 Stable marriage algorithm OK
diff --git a/scripts/staging/slicing/base/Bucket.py
b/scripts/staging/slicing/base/Bucket.py
index 0277f6d..dc8402e 100644
--- a/scripts/staging/slicing/base/Bucket.py
+++ b/scripts/staging/slicing/base/Bucket.py
@@ -45,6 +45,8 @@ class Bucket:
self.parents = []
self.sum_error = 0
self.size = 0
+ self.s_upper = 0
+ self.s_lower = 0
self.score = 0
self.error = 0
self.max_tuple_error = 0
diff --git a/scripts/staging/slicing/base/SparkNode.py
b/scripts/staging/slicing/base/SparkNode.py
index fbaa0bd..a123624 100644
--- a/scripts/staging/slicing/base/SparkNode.py
+++ b/scripts/staging/slicing/base/SparkNode.py
@@ -65,25 +65,16 @@ class SparkNode:
print(mask)
if loss_type == 0:
self.calc_l2(mask)
- if loss_type == 1:
+ elif loss_type == 1:
self.calc_class(mask)
def calc_class(self, mask):
self.e_max = 1
- size = 0
- mistakes = 0
- for row in self.preds:
- flag = True
- for attr in mask:
- if attr not in row[0].indices:
- flag = False
- if flag:
- size = size + 1
- if row[1] == 0:
- mistakes += 1
- self.size = size
- if size != 0:
- self.loss = mistakes / size
+ filtered = self.filter_by_mask(mask)
+ self.size = len(filtered)
+ mistakes = len(list(filter(lambda row: row[1] == 0, filtered)))
+ if self.size != 0:
+ self.loss = mistakes / self.size
else:
self.loss = 0
self.e_upper = self.loss
@@ -92,25 +83,22 @@ class SparkNode:
max_tuple_error = 0
sum_error = 0
size = 0
- for row in self.preds:
- flag = True
- for attr in mask:
- if attr not in row[0].indices:
- flag = False
- if flag:
- size = size + 1
- if row[1] > max_tuple_error:
- max_tuple_error = row[1]
- sum_error = sum_error + row[1]
+ filtered = self.filter_by_mask(mask)
+ self.size = len(filtered)
+ for row in filtered:
+ if row[1] > max_tuple_error:
+ max_tuple_error = row[1]
+ sum_error += row[1]
self.e_max = max_tuple_error
self.e_upper = max_tuple_error
self.e_max_upper = max_tuple_error
- if size != 0:
- self.loss = sum_error/size
+ if self.size != 0:
+ self.loss = sum_error/self.size
else:
self.loss = 0
- self.size = size
- self.s_upper = size
+
+ def filter_by_mask(self, mask):
+ return list(filter(lambda row: all(attr in row[0].indices for attr in
mask), self.preds))
def calc_s_upper(self, cur_lvl):
cur_min = self.parents[0].size
@@ -168,30 +156,6 @@ class SparkNode:
def check_bounds(self, top_k, x_size, alpha):
return self.s_upper >= x_size / alpha and self.c_upper >=
top_k.min_score
- def update_bounds(self, s_upper, s_lower, e_upper, e_max_upper, w):
- try:
- minimized = min(s_upper, self.s_upper)
- self.s_upper = minimized
- minimized = min(s_lower, self.s_lower)
- self.s_lower = minimized
- minimized = min(e_upper, self.e_upper)
- self.e_upper = minimized
- minimized = min(e_max_upper, self.e_max_upper)
- self.e_max_upper = minimized
- c_upper = self.calc_c_upper(w)
- minimized = min(c_upper, self.c_upper)
- self.c_upper = minimized
- except AttributeError:
- # initial bounds calculation
- self.s_upper = s_upper
- self.s_lower = s_lower
- self.e_upper = e_upper
- self.e_max_upper = e_max_upper
- c_upper = self.calc_c_upper(w)
- self.c_upper = c_upper
- minimized = min(c_upper, self.c_upper)
- self.c_upper = minimized
-
def print_debug(self, topk, level):
print("new node has been created: " + self.make_name() + "\n")
if level >= 1:
diff --git a/scripts/staging/slicing/base/__init__.py
b/scripts/staging/slicing/base/__init__.py
index e66abb4..cc59154 100644
--- a/scripts/staging/slicing/base/__init__.py
+++ b/scripts/staging/slicing/base/__init__.py
@@ -1,4 +1,4 @@
-# -------------------------------------------------------------
+#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -17,4 +17,4 @@
# specific language governing permissions and limitations
# under the License.
#
-# -------------------------------------------------------------
+#-------------------------------------------------------------
diff --git a/scripts/staging/slicing/base/node.py
b/scripts/staging/slicing/base/node.py
index 33091a5..4d80651 100644
--- a/scripts/staging/slicing/base/node.py
+++ b/scripts/staging/slicing/base/node.py
@@ -19,6 +19,7 @@
#
#-------------------------------------------------------------
+
class Node:
error: float
name: ""
@@ -41,9 +42,14 @@ class Node:
self.parents = []
self.attributes = []
self.size = 0
+ self.s_upper = 0
+ self.s_lower = 0
self.score = 0
self.complete_x = complete_x
self.loss = 0
+ self.e_upper = 0
+ self.e_max_upper = 0
+ self.c_upper = 0
self.x_size = x_size
self.preds = preds
self.s_lower = 1
@@ -69,38 +75,27 @@ class Node:
def calc_class(self, mask):
self.e_max = 1
- size = 0
mistakes = 0
- for row in self.complete_x:
- flag = True
- for attr in mask:
- if row[1][attr] == 0:
- flag = False
- if flag:
- size = size + 1
- if self.y_test[row[0]][1] != self.preds[row[0]][1]:
- mistakes = mistakes + 1
- self.size = size
- if size != 0:
- self.loss = mistakes / size
+ filtered = self.filter_by_mask(mask)
+ for row in filtered:
+ if self.y_test[row[0]][1] != self.preds[row[0]][1]:
+ mistakes = mistakes + 1
+ self.size = len(filtered)
+ if self.size != 0:
+ self.loss = mistakes / self.size
else:
self.loss = 0
self.e_upper = self.loss
def calc_l2(self, mask):
+ filtered = self.filter_by_mask(mask)
max_tuple_error = 0
sum_error = 0
- size = 0
- for row in self.complete_x:
- flag = True
- for attr in mask:
- if row[1][attr] == 0:
- flag = False
- if flag:
- size = size + 1
- if float(self.preds[row[0]][1]) > max_tuple_error:
+ size = len(filtered)
+ for row in filtered:
+ if float(self.preds[row[0]][1]) > max_tuple_error:
max_tuple_error = float(self.preds[row[0]][1])
- sum_error = sum_error + float(self.preds[row[0]][1])
+ sum_error += float(self.preds[row[0]][1])
self.e_max = max_tuple_error
self.e_upper = max_tuple_error
if size != 0:
@@ -109,6 +104,9 @@ class Node:
self.loss = 0
self.size = size
+ def filter_by_mask(self, mask):
+ return list(filter(lambda row: all(row[1][attr] == 1 for attr in
mask), self.complete_x))
+
def calc_s_upper(self, cur_lvl):
cur_min = self.parents[0].size
for parent in self.parents:
@@ -166,28 +164,36 @@ class Node:
return self.s_upper >= x_size / alpha and self.c_upper >=
top_k.min_score
def update_bounds(self, s_upper, s_lower, e_upper, e_max_upper, w):
- try:
+ if self.s_upper:
minimized = min(s_upper, self.s_upper)
self.s_upper = minimized
+ else:
+ self.s_upper = s_upper
+
+ if self.s_lower:
minimized = min(s_lower, self.s_lower)
self.s_lower = minimized
+ else:
+ self.s_lower = s_lower
+
+ if self.e_upper:
minimized = min(e_upper, self.e_upper)
self.e_upper = minimized
+ else:
+ self.e_upper = e_upper
+
+ if self.e_max_upper:
minimized = min(e_max_upper, self.e_max_upper)
self.e_max_upper = minimized
- c_upper = self.calc_c_upper(w)
+ else:
+ self.e_max_upper = e_max_upper
+
+ c_upper = self.calc_c_upper(w)
+ if self.c_upper:
minimized = min(c_upper, self.c_upper)
self.c_upper = minimized
- except AttributeError:
- # initial bounds calculation
- self.s_upper = s_upper
- self.s_lower = s_lower
- self.e_upper = e_upper
- self.e_max_upper = e_max_upper
- c_upper = self.calc_c_upper(w)
+ else:
self.c_upper = c_upper
- minimized = min(c_upper, self.c_upper)
- self.c_upper = minimized
def print_debug(self, topk, level):
print("new node has been created: " + self.make_name() + "\n")
diff --git a/scripts/staging/slicing/base/slicer.py
b/scripts/staging/slicing/base/slicer.py
index be967b6..a3c4a48 100644
--- a/scripts/staging/slicing/base/slicer.py
+++ b/scripts/staging/slicing/base/slicer.py
@@ -38,12 +38,10 @@ def opt_fun(fi, si, f, x_size, w):
# valid combination example: node ABC + node BCD (on 4th level) // three
attributes nodes have two common attributes
# invalid combination example: node ABC + CDE (on 4th level) // result node -
ABCDE (absurd for 4th level)
def slice_name_nonsense(node_i, node_j, cur_lvl):
- commons = 0
- for attr1 in node_i.attributes:
- for attr2 in node_j.attributes:
- if attr1[0].split("_")[0] == attr2[0].split("_")[0]:
- commons = commons + 1
- return commons != cur_lvl - 1
+ attr1 = list(map(lambda x: x[0].split("_")[0], node_i.attributes))
+ attr2 = list(map(lambda x: x[0].split("_")[0], node_j.attributes))
+ commons = len(list(set(attr1) & set(attr2)))
+ return commons == cur_lvl - 1
def union(lst1, lst2):
@@ -81,7 +79,7 @@ def join_enum(node_i, prev_lvl, complete_x, loss, x_size,
y_test, errors, debug,
all_nodes, top_k, cur_lvl_nodes):
for node_j in range(len(prev_lvl)):
flag = slice_name_nonsense(prev_lvl[node_i], prev_lvl[node_j], cur_lvl)
- if not flag and prev_lvl[node_j].key[0] > prev_lvl[node_i].key[0]:
+ if flag and prev_lvl[node_j].key[0] > prev_lvl[node_i].key[0]:
new_node = Node(complete_x, loss, x_size, y_test, errors)
parents_set = set(new_node.parents)
parents_set.add(prev_lvl[node_i])
diff --git a/scripts/staging/slicing/base/union_slicer.py
b/scripts/staging/slicing/base/union_slicer.py
index 6b5fb0e..78b7d2b 100644
--- a/scripts/staging/slicing/base/union_slicer.py
+++ b/scripts/staging/slicing/base/union_slicer.py
@@ -25,14 +25,11 @@ from slicing.base.slicer import opt_fun, union
def check_attributes(left_node, right_node):
- flag = False
- for attr1 in left_node.attributes:
- for attr2 in right_node.attributes:
- if attr1[0].split("_")[0] == attr2[0].split("_")[0]:
- # there are common attributes which is not the case we need
- flag = True
- break
- return flag
+ attr1 = list(map(lambda x: x[0].split("_")[0], left_node.attributes))
+ attr2 = list(map(lambda x: x[0].split("_")[0], right_node.attributes))
+ if set(attr1).intersection(set(attr2)):
+ return False
+ return True
def make_first_level(all_features, complete_x, loss, x_size, y_test, errors,
loss_type, w, alpha, top_k):
@@ -65,10 +62,6 @@ def make_first_level(all_features, complete_x, loss, x_size,
y_test, errors, los
return first_level, all_nodes
-def union_enum():
- return None
-
-
def process(all_features, complete_x, loss, x_size, y_test, errors, debug,
alpha, k, w, loss_type, b_update):
top_k = Topk(k)
# First level slices are enumerated in a "classic way" (getting data and
not analyzing bounds
@@ -94,7 +87,7 @@ def process(all_features, complete_x, loss, x_size, y_test,
errors, debug, alpha
for node_i in range(len(levels[left][0])):
for node_j in range(len(levels[right][0])):
flag = check_attributes(levels[left][0][node_i],
levels[right][0][node_j])
- if not flag:
+ if flag:
new_node = Node(complete_x, loss, x_size, y_test,
errors)
parents_set = set(new_node.parents)
parents_set.add(levels[left][0][node_i])
diff --git a/scripts/staging/slicing/spark_modules/join_data_parallel.py
b/scripts/staging/slicing/spark_modules/join_data_parallel.py
index 78156c3..4f2ff0e 100644
--- a/scripts/staging/slicing/spark_modules/join_data_parallel.py
+++ b/scripts/staging/slicing/spark_modules/join_data_parallel.py
@@ -74,10 +74,9 @@ def parallel_process(all_features, predictions, loss, sc,
debug, alpha, k, w, lo
x_size = len(pred_pandas)
b_topk = SparkContext.broadcast(sc, top_k)
b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
- b_cur_lvl_nodes = SparkContext.broadcast(sc, cur_lvl_nodes)
buckets = {}
- for node in b_cur_lvl_nodes.value:
- bucket = Bucket(node, b_cur_lvl.value, w, x_size, loss)
+ for node in cur_lvl_nodes:
+ bucket = Bucket(node, cur_lvl, w, x_size, loss)
buckets[bucket.name] = bucket
b_buckets = SparkContext.broadcast(sc, buckets)
rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2]))\
@@ -91,16 +90,17 @@ def parallel_process(all_features, predictions, loss, sc,
debug, alpha, k, w, lo
cur_lvl_nodes.map(lambda bucket:
bucket.print_debug(b_topk.value)).collect()
cur_lvl = 1
prev_level = cur_lvl_nodes.collect()
- top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
+ top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
while len(prev_level) > 0:
b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
b_topk = SparkContext.broadcast(sc, top_k)
b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
- b_topk.value.print_topk()
- buckets = join_enum(b_cur_lvl_nodes.value, b_cur_lvl.value, x_size,
alpha, b_topk.value, w, loss)
+ top_k.print_topk()
+ buckets = join_enum(prev_level, cur_lvl, x_size, alpha, top_k, w, loss)
b_buckets = SparkContext.broadcast(sc, buckets)
- to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size,
alpha, b_topk.value), b_buckets.value.items()))
- mapped = rows.map(lambda row: rows_mapper(row, to_slice, loss_type))
+ to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size,
alpha, top_k), buckets.items()))
+ b_to_slice = SparkContext.broadcast(sc, to_slice)
+ mapped = rows.map(lambda row: rows_mapper(row, b_to_slice.value,
loss_type))
flattened = mapped.flatMap(lambda line: (line.items()))
to_process = flattened.combineByKey(combiner, merge_values,
merge_combiners)
if debug:
@@ -108,13 +108,13 @@ def parallel_process(all_features, predictions, loss, sc,
debug, alpha, k, w, lo
prev_level = to_process\
.map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1],
loss, w, x_size, b_cur_lvl.value))\
.collect()
- cur_lvl = b_cur_lvl.value + 1
- top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
- print("Level " + str(b_cur_lvl.value) + " had " + str(
+ cur_lvl += 1
+ top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+ print("Level " + str(cur_lvl) + " had " + str(
len(b_cur_lvl_nodes.value * (len(b_cur_lvl_nodes.value) - 1)))+"
candidates but after pruning only " +
str(len(prev_level)) + " go to the next level")
print("Program stopped at level " + str(cur_lvl))
print()
print("Selected slices are: ")
- b_topk.value.print_topk()
+ top_k.print_topk()
return None
diff --git a/scripts/staging/slicing/spark_modules/spark_slicer.py
b/scripts/staging/slicing/spark_modules/spark_slicer.py
index 86f2b34..83de579 100644
--- a/scripts/staging/slicing/spark_modules/spark_slicer.py
+++ b/scripts/staging/slicing/spark_modules/spark_slicer.py
@@ -73,14 +73,15 @@ def parallel_process(all_features, predictions, loss, sc,
debug, alpha, k, w, lo
b_topk.value, w, loss_type)) \
.map(lambda node: (node.key, node)).collect()
first_level.update(init_slices)
- update_top_k(first_level, b_topk.value, alpha, predictions)
+ update_top_k(first_level, top_k, alpha, predictions)
prev_level = SparkContext.broadcast(sc, first_level)
levels.append(prev_level)
cur_lvl = cur_lvl + 1
- b_topk.value.print_topk()
+ top_k.print_topk()
# checking the first partition of level. if not empty then processing
otherwise no elements were added to this level
while len(levels[cur_lvl - 1].value) > 0:
nodes_list = {}
+ b_topk = SparkContext.broadcast(sc, top_k)
partitions = sc.parallelize(levels[cur_lvl - 1].value.values())
mapped = partitions.mapPartitions(lambda nodes:
spark_utils.nodes_enum(nodes, levels[cur_lvl - 1].value.values(),
predictions, loss, b_topk.value, alpha, k, w,
@@ -89,7 +90,7 @@ def parallel_process(all_features, predictions, loss, sc,
debug, alpha, k, w, lo
nodes_list.update(flattened.map(lambda node: (node.key,
node)).distinct().collect())
prev_level = SparkContext.broadcast(sc, nodes_list)
levels.append(prev_level)
- update_top_k(nodes_list, b_topk.value, alpha, predictions)
+ update_top_k(nodes_list, top_k, alpha, predictions)
cur_lvl = cur_lvl + 1
b_topk.value.print_topk()
print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl -
1].value) * (len(levels[cur_lvl - 1].value) - 1)) +
@@ -97,4 +98,4 @@ def parallel_process(all_features, predictions, loss, sc,
debug, alpha, k, w, lo
print("Program stopped at level " + str(cur_lvl))
print()
print("Selected slices are: ")
- b_topk.value.print_topk()
+ top_k.print_topk()
diff --git a/scripts/staging/slicing/spark_modules/spark_union_slicer.py
b/scripts/staging/slicing/spark_modules/spark_union_slicer.py
index 811baff..02ee716 100644
--- a/scripts/staging/slicing/spark_modules/spark_union_slicer.py
+++ b/scripts/staging/slicing/spark_modules/spark_union_slicer.py
@@ -40,13 +40,14 @@ def process(all_features, predictions, loss, sc, debug,
alpha, k, w, loss_type,
.map(lambda node: (node.key, node)) \
.collect()
first_level.update(init_slices)
- update_top_k(first_level, b_topk.value, alpha, predictions)
+ update_top_k(first_level, top_k, alpha, predictions)
prev_level = SparkContext.broadcast(sc, first_level)
levels.append(prev_level)
cur_lvl = 1
- b_topk.value.print_topk()
+ top_k.print_topk()
while len(levels[cur_lvl - 1].value) > 0:
cur_lvl_res = {}
+ b_topk = SparkContext.broadcast(sc, top_k)
for left in range(int(cur_lvl / 2) + 1):
right = cur_lvl - left - 1
partitions = sc.parallelize(levels[left].value.values())
@@ -59,7 +60,7 @@ def process(all_features, predictions, loss, sc, debug,
alpha, k, w, loss_type,
cur_lvl_res.update(partial)
prev_level = SparkContext.broadcast(sc, cur_lvl_res)
levels.append(prev_level)
- update_top_k(cur_lvl_res, b_topk.value, alpha, predictions)
+ update_top_k(cur_lvl_res, top_k, alpha, predictions)
cur_lvl = cur_lvl + 1
top_k.print_topk()
print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl -
1].value) * (len(levels[cur_lvl - 1].value) - 1)) +
@@ -67,4 +68,4 @@ def process(all_features, predictions, loss, sc, debug,
alpha, k, w, loss_type,
print("Program stopped at level " + str(cur_lvl))
print()
print("Selected slices are: ")
- b_topk.value.print_topk()
+ top_k.print_topk()
diff --git a/scripts/staging/slicing/spark_modules/spark_utils.py
b/scripts/staging/slicing/spark_modules/spark_utils.py
index a12c84b..cb83c71 100644
--- a/scripts/staging/slicing/spark_modules/spark_utils.py
+++ b/scripts/staging/slicing/spark_modules/spark_utils.py
@@ -49,6 +49,12 @@ def approved_join_slice(node_i, node_j, cur_lvl):
return commons == cur_lvl - 1
+def approved_union_slice(node_i, node_j):
+ if set(node_i.attributes).intersection(set(node_j.attributes)):
+ return False
+ return True
+
+
def make_first_level(features, predictions, loss, top_k, w, loss_type):
first_level = []
# First level slices are enumerated in a "classic way" (getting data and
not analyzing bounds
@@ -66,15 +72,6 @@ def make_first_level(features, predictions, loss, top_k, w,
loss_type):
return first_level
-def approved_union_slice(node_i, node_j):
- for attr1 in node_i.attributes:
- for attr2 in node_j.attributes:
- if attr1 == attr2:
- # there are common attributes which is not the case we need
- return False
- return True
-
-
def process_node(node_i, level, loss, predictions, cur_lvl, top_k, alpha,
loss_type, w, debug, enumerator):
cur_enum_nodes = []
for node_j in level:
diff --git a/scripts/staging/slicing/spark_modules/union_data_parallel.py
b/scripts/staging/slicing/spark_modules/union_data_parallel.py
index b000abe..dcd1af3 100644
--- a/scripts/staging/slicing/spark_modules/union_data_parallel.py
+++ b/scripts/staging/slicing/spark_modules/union_data_parallel.py
@@ -67,7 +67,7 @@ def parallel_process(all_features, predictions, loss, sc,
debug, alpha, k, w, lo
b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
buckets = {}
for node in cur_lvl_nodes:
- bucket = Bucket(node, b_cur_lvl.value, w, x_size, loss)
+ bucket = Bucket(node, cur_lvl, w, x_size, loss)
buckets[bucket.name] = bucket
b_buckets = SparkContext.broadcast(sc, buckets)
rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2])) \
@@ -83,37 +83,39 @@ def parallel_process(all_features, predictions, loss, sc,
debug, alpha, k, w, lo
prev_level = cur_lvl_nodes.collect()
b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
levels.append(b_cur_lvl_nodes)
- top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
- while len(levels[cur_lvl - 1].value) > 0:
+ top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+ while len(prev_level) > 0:
b_topk = SparkContext.broadcast(sc, top_k)
b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
- b_topk.value.print_topk()
+ top_k.print_topk()
buckets = []
for left in range(int(cur_lvl / 2) + 1):
right = cur_lvl - left - 1
- nodes = union_enum(levels[left].value, levels[right].value,
x_size, alpha, b_topk.value, w, loss, b_cur_lvl.value)
+ nodes = union_enum(levels[left].value, levels[right].value,
x_size, alpha, top_k, w, loss, cur_lvl)
buckets.append(nodes)
b_buckets = sc.parallelize(buckets)
all_buckets = b_buckets.flatMap(lambda line: (line.items()))
combined = dict(all_buckets.combineByKey(combiner, merge_values,
merge_combiners).collect())
b_buckets = SparkContext.broadcast(sc, combined)
- to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size,
alpha, b_topk.value), b_buckets.value.items()))
- mapped = rows.map(lambda row: rows_mapper(row, to_slice, loss_type))
+ to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size,
alpha, top_k), combined.items()))
+ b_to_slice = SparkContext.broadcast(sc, to_slice)
+ mapped = rows.map(lambda row: rows_mapper(row, b_to_slice.value,
loss_type))
flattened = mapped.flatMap(lambda line: (line.items()))
partial = flattened.combineByKey(combiner,
join_data_parallel.merge_values, join_data_parallel.merge_combiners)
prev_level = partial\
.map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1],
loss, w, x_size, b_cur_lvl.value)).collect()
+ top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+ b_topk = SparkContext.broadcast(sc, top_k)
if debug:
partial.values().map(lambda bucket:
bucket.print_debug(b_topk.value)).collect()
- top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
print("Level " + str(cur_lvl) + " had " + str(
len(levels[cur_lvl - 1].value) * (len(levels[cur_lvl - 1].value) -
1)) +
" candidates but after pruning only " + str(len(prev_level)) + "
go to the next level")
print("Program stopped at level " + str(cur_lvl))
b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
levels.append(b_cur_lvl_nodes)
- cur_lvl = b_cur_lvl.value + 1
+ cur_lvl += 1
print()
print("Selected slices are: ")
- b_topk.value.print_topk()
+ top_k.print_topk()
return None
diff --git a/scripts/staging/slicing/tests/classification/__init__.py
b/scripts/staging/slicing/tests/classification/__init__.py
index e66abb4..cc59154 100644
--- a/scripts/staging/slicing/tests/classification/__init__.py
+++ b/scripts/staging/slicing/tests/classification/__init__.py
@@ -1,4 +1,4 @@
-# -------------------------------------------------------------
+#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -17,4 +17,4 @@
# specific language governing permissions and limitations
# under the License.
#
-# -------------------------------------------------------------
+#-------------------------------------------------------------
diff --git a/scripts/staging/slicing/tests/regression/__init__.py
b/scripts/staging/slicing/tests/regression/__init__.py
index e66abb4..cc59154 100644
--- a/scripts/staging/slicing/tests/regression/__init__.py
+++ b/scripts/staging/slicing/tests/regression/__init__.py
@@ -1,4 +1,4 @@
-# -------------------------------------------------------------
+#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -17,4 +17,4 @@
# specific language governing permissions and limitations
# under the License.
#
-# -------------------------------------------------------------
+#-------------------------------------------------------------