This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git


The following commit(s) were added to refs/heads/master by this push:
     new e78962b  [SYSTEMDS-254] Fixes distributed slice finding implementation
e78962b is described below

commit e78962b8db5b1c90fdb37ae6d9c6284f744cdbfc
Author: gilgenbergg <[email protected]>
AuthorDate: Sat May 23 23:24:30 2020 +0200

    [SYSTEMDS-254] Fixes distributed slice finding implementation
    
    Closes #908.
---
 docs/Tasks.txt                                     |  1 +
 scripts/staging/slicing/base/Bucket.py             |  2 +
 scripts/staging/slicing/base/SparkNode.py          | 70 +++++---------------
 scripts/staging/slicing/base/__init__.py           |  4 +-
 scripts/staging/slicing/base/node.py               | 74 ++++++++++++----------
 scripts/staging/slicing/base/slicer.py             | 12 ++--
 scripts/staging/slicing/base/union_slicer.py       | 19 ++----
 .../slicing/spark_modules/join_data_parallel.py    | 24 +++----
 .../staging/slicing/spark_modules/spark_slicer.py  |  9 +--
 .../slicing/spark_modules/spark_union_slicer.py    |  9 +--
 .../staging/slicing/spark_modules/spark_utils.py   | 15 ++---
 .../slicing/spark_modules/union_data_parallel.py   | 22 ++++---
 .../slicing/tests/classification/__init__.py       |  4 +-
 .../staging/slicing/tests/regression/__init__.py   |  4 +-
 14 files changed, 117 insertions(+), 152 deletions(-)

diff --git a/docs/Tasks.txt b/docs/Tasks.txt
index 7b64145..1196566 100644
--- a/docs/Tasks.txt
+++ b/docs/Tasks.txt
@@ -209,6 +209,7 @@ SYSTEMDS-250 Extended Slice Finding
  * 251 Alternative slice enumeration approach                         OK
  * 252 Initial data slicing implementation Python                     OK
  * 253 Distributed slicing algorithms (task/data parallel)            OK
+ * 254 Consolidation and fixes distributed slice finding              OK
 
 SYSTEMDS-260 Misc Tools
  * 261 Stable marriage algorithm                                      OK
diff --git a/scripts/staging/slicing/base/Bucket.py 
b/scripts/staging/slicing/base/Bucket.py
index 0277f6d..dc8402e 100644
--- a/scripts/staging/slicing/base/Bucket.py
+++ b/scripts/staging/slicing/base/Bucket.py
@@ -45,6 +45,8 @@ class Bucket:
         self.parents = []
         self.sum_error = 0
         self.size = 0
+        self.s_upper = 0
+        self.s_lower = 0
         self.score = 0
         self.error = 0
         self.max_tuple_error = 0
diff --git a/scripts/staging/slicing/base/SparkNode.py 
b/scripts/staging/slicing/base/SparkNode.py
index fbaa0bd..a123624 100644
--- a/scripts/staging/slicing/base/SparkNode.py
+++ b/scripts/staging/slicing/base/SparkNode.py
@@ -65,25 +65,16 @@ class SparkNode:
         print(mask)
         if loss_type == 0:
             self.calc_l2(mask)
-        if loss_type == 1:
+        elif loss_type == 1:
             self.calc_class(mask)
 
     def calc_class(self, mask):
         self.e_max = 1
-        size = 0
-        mistakes = 0
-        for row in self.preds:
-            flag = True
-            for attr in mask:
-                if attr not in row[0].indices:
-                    flag = False
-            if flag:
-                size = size + 1
-                if row[1] == 0:
-                    mistakes += 1
-        self.size = size
-        if size != 0:
-            self.loss = mistakes / size
+        filtered = self.filter_by_mask(mask)
+        self.size = len(filtered)
+        mistakes = len(list(filter(lambda row: row[1] == 0, filtered)))
+        if self.size != 0:
+            self.loss = mistakes / self.size
         else:
             self.loss = 0
         self.e_upper = self.loss
@@ -92,25 +83,22 @@ class SparkNode:
         max_tuple_error = 0
         sum_error = 0
         size = 0
-        for row in self.preds:
-            flag = True
-            for attr in mask:
-                if attr not in row[0].indices:
-                    flag = False
-            if flag:
-                size = size + 1
-                if row[1] > max_tuple_error:
-                    max_tuple_error = row[1]
-                sum_error = sum_error + row[1]
+        filtered = self.filter_by_mask(mask)
+        self.size = len(filtered)
+        for row in filtered:
+            if row[1] > max_tuple_error:
+                max_tuple_error = row[1]
+            sum_error += row[1]
         self.e_max = max_tuple_error
         self.e_upper = max_tuple_error
         self.e_max_upper = max_tuple_error
-        if size != 0:
-            self.loss = sum_error/size
+        if self.size != 0:
+            self.loss = sum_error/self.size
         else:
             self.loss = 0
-        self.size = size
-        self.s_upper = size
+
+    def filter_by_mask(self, mask):
+        return list(filter(lambda row: all(attr in row[0].indices for attr in 
mask), self.preds))
 
     def calc_s_upper(self, cur_lvl):
         cur_min = self.parents[0].size
@@ -168,30 +156,6 @@ class SparkNode:
     def check_bounds(self, top_k, x_size, alpha):
         return self.s_upper >= x_size / alpha and self.c_upper >= 
top_k.min_score
 
-    def update_bounds(self, s_upper, s_lower, e_upper, e_max_upper, w):
-        try:
-            minimized = min(s_upper, self.s_upper)
-            self.s_upper = minimized
-            minimized = min(s_lower, self.s_lower)
-            self.s_lower = minimized
-            minimized = min(e_upper, self.e_upper)
-            self.e_upper = minimized
-            minimized = min(e_max_upper, self.e_max_upper)
-            self.e_max_upper = minimized
-            c_upper = self.calc_c_upper(w)
-            minimized = min(c_upper, self.c_upper)
-            self.c_upper = minimized
-        except AttributeError:
-            # initial bounds calculation
-            self.s_upper = s_upper
-            self.s_lower = s_lower
-            self.e_upper = e_upper
-            self.e_max_upper = e_max_upper
-            c_upper = self.calc_c_upper(w)
-            self.c_upper = c_upper
-        minimized = min(c_upper, self.c_upper)
-        self.c_upper = minimized
-
     def print_debug(self, topk, level):
         print("new node has been created: " + self.make_name() + "\n")
         if level >= 1:
diff --git a/scripts/staging/slicing/base/__init__.py 
b/scripts/staging/slicing/base/__init__.py
index e66abb4..cc59154 100644
--- a/scripts/staging/slicing/base/__init__.py
+++ b/scripts/staging/slicing/base/__init__.py
@@ -1,4 +1,4 @@
-# -------------------------------------------------------------
+#-------------------------------------------------------------
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -17,4 +17,4 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-# -------------------------------------------------------------
+#-------------------------------------------------------------
diff --git a/scripts/staging/slicing/base/node.py 
b/scripts/staging/slicing/base/node.py
index 33091a5..4d80651 100644
--- a/scripts/staging/slicing/base/node.py
+++ b/scripts/staging/slicing/base/node.py
@@ -19,6 +19,7 @@
 #
 #-------------------------------------------------------------
 
+
 class Node:
     error: float
     name: ""
@@ -41,9 +42,14 @@ class Node:
         self.parents = []
         self.attributes = []
         self.size = 0
+        self.s_upper = 0
+        self.s_lower = 0
         self.score = 0
         self.complete_x = complete_x
         self.loss = 0
+        self.e_upper = 0
+        self.e_max_upper = 0
+        self.c_upper = 0
         self.x_size = x_size
         self.preds = preds
         self.s_lower = 1
@@ -69,38 +75,27 @@ class Node:
 
     def calc_class(self, mask):
         self.e_max = 1
-        size = 0
         mistakes = 0
-        for row in self.complete_x:
-            flag = True
-            for attr in mask:
-                if row[1][attr] == 0:
-                    flag = False
-            if flag:
-                size = size + 1
-                if self.y_test[row[0]][1] != self.preds[row[0]][1]:
-                    mistakes = mistakes + 1
-        self.size = size
-        if size != 0:
-            self.loss = mistakes / size
+        filtered = self.filter_by_mask(mask)
+        for row in filtered:
+            if self.y_test[row[0]][1] != self.preds[row[0]][1]:
+                mistakes = mistakes + 1
+        self.size = len(filtered)
+        if self.size != 0:
+            self.loss = mistakes / self.size
         else:
             self.loss = 0
         self.e_upper = self.loss
 
     def calc_l2(self, mask):
+        filtered = self.filter_by_mask(mask)
         max_tuple_error = 0
         sum_error = 0
-        size = 0
-        for row in self.complete_x:
-            flag = True
-            for attr in mask:
-                if row[1][attr] == 0:
-                    flag = False
-            if flag:
-                size = size + 1
-                if float(self.preds[row[0]][1]) > max_tuple_error:
+        size = len(filtered)
+        for row in filtered:
+            if float(self.preds[row[0]][1]) > max_tuple_error:
                     max_tuple_error = float(self.preds[row[0]][1])
-                sum_error = sum_error + float(self.preds[row[0]][1])
+            sum_error += float(self.preds[row[0]][1])
         self.e_max = max_tuple_error
         self.e_upper = max_tuple_error
         if size != 0:
@@ -109,6 +104,9 @@ class Node:
             self.loss = 0
         self.size = size
 
+    def filter_by_mask(self, mask):
+        return list(filter(lambda row: all(row[1][attr] == 1 for attr in 
mask), self.complete_x))
+
     def calc_s_upper(self, cur_lvl):
         cur_min = self.parents[0].size
         for parent in self.parents:
@@ -166,28 +164,36 @@ class Node:
         return self.s_upper >= x_size / alpha and self.c_upper >= 
top_k.min_score
 
     def update_bounds(self, s_upper, s_lower, e_upper, e_max_upper, w):
-        try:
+        if self.s_upper:
             minimized = min(s_upper, self.s_upper)
             self.s_upper = minimized
+        else:
+            self.s_upper = s_upper
+
+        if self.s_lower:
             minimized = min(s_lower, self.s_lower)
             self.s_lower = minimized
+        else:
+            self.s_lower = s_lower
+
+        if self.e_upper:
             minimized = min(e_upper, self.e_upper)
             self.e_upper = minimized
+        else:
+            self.e_upper = e_upper
+
+        if self.e_max_upper:
             minimized = min(e_max_upper, self.e_max_upper)
             self.e_max_upper = minimized
-            c_upper = self.calc_c_upper(w)
+        else:
+            self.e_max_upper = e_max_upper
+
+        c_upper = self.calc_c_upper(w)
+        if self.c_upper:
             minimized = min(c_upper, self.c_upper)
             self.c_upper = minimized
-        except AttributeError:
-            # initial bounds calculation
-            self.s_upper = s_upper
-            self.s_lower = s_lower
-            self.e_upper = e_upper
-            self.e_max_upper = e_max_upper
-            c_upper = self.calc_c_upper(w)
+        else:
             self.c_upper = c_upper
-        minimized = min(c_upper, self.c_upper)
-        self.c_upper = minimized
 
     def print_debug(self, topk, level):
         print("new node has been created: " + self.make_name() + "\n")
diff --git a/scripts/staging/slicing/base/slicer.py 
b/scripts/staging/slicing/base/slicer.py
index be967b6..a3c4a48 100644
--- a/scripts/staging/slicing/base/slicer.py
+++ b/scripts/staging/slicing/base/slicer.py
@@ -38,12 +38,10 @@ def opt_fun(fi, si, f, x_size, w):
 # valid combination example: node ABC + node BCD (on 4th level) // three 
attributes nodes have two common attributes
 # invalid combination example: node ABC + CDE (on 4th level) // result node - 
ABCDE (absurd for 4th level)
 def slice_name_nonsense(node_i, node_j, cur_lvl):
-    commons = 0
-    for attr1 in node_i.attributes:
-        for attr2 in node_j.attributes:
-            if attr1[0].split("_")[0] == attr2[0].split("_")[0]:
-                commons = commons + 1
-    return commons != cur_lvl - 1
+    attr1 = list(map(lambda x: x[0].split("_")[0], node_i.attributes))
+    attr2 = list(map(lambda x: x[0].split("_")[0], node_j.attributes))
+    commons = len(list(set(attr1) & set(attr2)))
+    return commons == cur_lvl - 1
 
 
 def union(lst1, lst2):
@@ -81,7 +79,7 @@ def join_enum(node_i, prev_lvl, complete_x, loss, x_size, 
y_test, errors, debug,
               all_nodes, top_k, cur_lvl_nodes):
     for node_j in range(len(prev_lvl)):
         flag = slice_name_nonsense(prev_lvl[node_i], prev_lvl[node_j], cur_lvl)
-        if not flag and prev_lvl[node_j].key[0] > prev_lvl[node_i].key[0]:
+        if flag and prev_lvl[node_j].key[0] > prev_lvl[node_i].key[0]:
             new_node = Node(complete_x, loss, x_size, y_test, errors)
             parents_set = set(new_node.parents)
             parents_set.add(prev_lvl[node_i])
diff --git a/scripts/staging/slicing/base/union_slicer.py 
b/scripts/staging/slicing/base/union_slicer.py
index 6b5fb0e..78b7d2b 100644
--- a/scripts/staging/slicing/base/union_slicer.py
+++ b/scripts/staging/slicing/base/union_slicer.py
@@ -25,14 +25,11 @@ from slicing.base.slicer import opt_fun, union
 
 
 def check_attributes(left_node, right_node):
-    flag = False
-    for attr1 in left_node.attributes:
-        for attr2 in right_node.attributes:
-            if attr1[0].split("_")[0] == attr2[0].split("_")[0]:
-                # there are common attributes which is not the case we need
-                flag = True
-                break
-    return flag
+    attr1 = list(map(lambda x: x[0].split("_")[0], left_node.attributes))
+    attr2 = list(map(lambda x: x[0].split("_")[0], right_node.attributes))
+    if set(attr1).intersection(set(attr2)):
+        return False
+    return True
 
 
 def make_first_level(all_features, complete_x, loss, x_size, y_test, errors, 
loss_type, w, alpha, top_k):
@@ -65,10 +62,6 @@ def make_first_level(all_features, complete_x, loss, x_size, 
y_test, errors, los
     return first_level, all_nodes
 
 
-def union_enum():
-    return None
-
-
 def process(all_features, complete_x, loss, x_size, y_test, errors, debug, 
alpha, k, w, loss_type, b_update):
     top_k = Topk(k)
     # First level slices are enumerated in a "classic way" (getting data and 
not analyzing bounds
@@ -94,7 +87,7 @@ def process(all_features, complete_x, loss, x_size, y_test, 
errors, debug, alpha
             for node_i in range(len(levels[left][0])):
                 for node_j in range(len(levels[right][0])):
                     flag = check_attributes(levels[left][0][node_i], 
levels[right][0][node_j])
-                    if not flag:
+                    if flag:
                         new_node = Node(complete_x, loss, x_size, y_test, 
errors)
                         parents_set = set(new_node.parents)
                         parents_set.add(levels[left][0][node_i])
diff --git a/scripts/staging/slicing/spark_modules/join_data_parallel.py 
b/scripts/staging/slicing/spark_modules/join_data_parallel.py
index 78156c3..4f2ff0e 100644
--- a/scripts/staging/slicing/spark_modules/join_data_parallel.py
+++ b/scripts/staging/slicing/spark_modules/join_data_parallel.py
@@ -74,10 +74,9 @@ def parallel_process(all_features, predictions, loss, sc, 
debug, alpha, k, w, lo
     x_size = len(pred_pandas)
     b_topk = SparkContext.broadcast(sc, top_k)
     b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
-    b_cur_lvl_nodes = SparkContext.broadcast(sc, cur_lvl_nodes)
     buckets = {}
-    for node in b_cur_lvl_nodes.value:
-        bucket = Bucket(node, b_cur_lvl.value, w, x_size, loss)
+    for node in cur_lvl_nodes:
+        bucket = Bucket(node, cur_lvl, w, x_size, loss)
         buckets[bucket.name] = bucket
     b_buckets = SparkContext.broadcast(sc, buckets)
     rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2]))\
@@ -91,16 +90,17 @@ def parallel_process(all_features, predictions, loss, sc, 
debug, alpha, k, w, lo
         cur_lvl_nodes.map(lambda bucket: 
bucket.print_debug(b_topk.value)).collect()
     cur_lvl = 1
     prev_level = cur_lvl_nodes.collect()
-    top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
+    top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
     while len(prev_level) > 0:
         b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
         b_topk = SparkContext.broadcast(sc, top_k)
         b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
-        b_topk.value.print_topk()
-        buckets = join_enum(b_cur_lvl_nodes.value, b_cur_lvl.value, x_size, 
alpha, b_topk.value, w, loss)
+        top_k.print_topk()
+        buckets = join_enum(prev_level, cur_lvl, x_size, alpha, top_k, w, loss)
         b_buckets = SparkContext.broadcast(sc, buckets)
-        to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size, 
alpha, b_topk.value), b_buckets.value.items()))
-        mapped = rows.map(lambda row: rows_mapper(row, to_slice, loss_type))
+        to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size, 
alpha, top_k), buckets.items()))
+        b_to_slice = SparkContext.broadcast(sc, to_slice)
+        mapped = rows.map(lambda row: rows_mapper(row, b_to_slice.value, 
loss_type))
         flattened = mapped.flatMap(lambda line: (line.items()))
         to_process = flattened.combineByKey(combiner, merge_values, 
merge_combiners)
         if debug:
@@ -108,13 +108,13 @@ def parallel_process(all_features, predictions, loss, sc, 
debug, alpha, k, w, lo
         prev_level = to_process\
             .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1], 
loss, w, x_size, b_cur_lvl.value))\
             .collect()
-        cur_lvl = b_cur_lvl.value + 1
-        top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
-        print("Level " + str(b_cur_lvl.value) + " had " + str(
+        cur_lvl += 1
+        top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+        print("Level " + str(cur_lvl) + " had " + str(
             len(b_cur_lvl_nodes.value * (len(b_cur_lvl_nodes.value) - 1)))+" 
candidates but after pruning only " +
               str(len(prev_level)) + " go to the next level")
         print("Program stopped at level " + str(cur_lvl))
     print()
     print("Selected slices are: ")
-    b_topk.value.print_topk()
+    top_k.print_topk()
     return None
diff --git a/scripts/staging/slicing/spark_modules/spark_slicer.py 
b/scripts/staging/slicing/spark_modules/spark_slicer.py
index 86f2b34..83de579 100644
--- a/scripts/staging/slicing/spark_modules/spark_slicer.py
+++ b/scripts/staging/slicing/spark_modules/spark_slicer.py
@@ -73,14 +73,15 @@ def parallel_process(all_features, predictions, loss, sc, 
debug, alpha, k, w, lo
                                                                                
           b_topk.value, w, loss_type)) \
         .map(lambda node: (node.key, node)).collect()
     first_level.update(init_slices)
-    update_top_k(first_level, b_topk.value, alpha, predictions)
+    update_top_k(first_level, top_k, alpha, predictions)
     prev_level = SparkContext.broadcast(sc, first_level)
     levels.append(prev_level)
     cur_lvl = cur_lvl + 1
-    b_topk.value.print_topk()
+    top_k.print_topk()
     # checking the first partition of level. if not empty then processing 
otherwise no elements were added to this level
     while len(levels[cur_lvl - 1].value) > 0:
         nodes_list = {}
+        b_topk = SparkContext.broadcast(sc, top_k)
         partitions = sc.parallelize(levels[cur_lvl - 1].value.values())
         mapped = partitions.mapPartitions(lambda nodes: 
spark_utils.nodes_enum(nodes, levels[cur_lvl - 1].value.values(),
                                                                                
predictions, loss, b_topk.value, alpha, k, w,
@@ -89,7 +90,7 @@ def parallel_process(all_features, predictions, loss, sc, 
debug, alpha, k, w, lo
         nodes_list.update(flattened.map(lambda node: (node.key, 
node)).distinct().collect())
         prev_level = SparkContext.broadcast(sc, nodes_list)
         levels.append(prev_level)
-        update_top_k(nodes_list, b_topk.value, alpha, predictions)
+        update_top_k(nodes_list, top_k, alpha, predictions)
         cur_lvl = cur_lvl + 1
         b_topk.value.print_topk()
         print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl - 
1].value) * (len(levels[cur_lvl - 1].value) - 1)) +
@@ -97,4 +98,4 @@ def parallel_process(all_features, predictions, loss, sc, 
debug, alpha, k, w, lo
     print("Program stopped at level " + str(cur_lvl))
     print()
     print("Selected slices are: ")
-    b_topk.value.print_topk()
+    top_k.print_topk()
diff --git a/scripts/staging/slicing/spark_modules/spark_union_slicer.py 
b/scripts/staging/slicing/spark_modules/spark_union_slicer.py
index 811baff..02ee716 100644
--- a/scripts/staging/slicing/spark_modules/spark_union_slicer.py
+++ b/scripts/staging/slicing/spark_modules/spark_union_slicer.py
@@ -40,13 +40,14 @@ def process(all_features, predictions, loss, sc, debug, 
alpha, k, w, loss_type,
         .map(lambda node: (node.key, node)) \
         .collect()
     first_level.update(init_slices)
-    update_top_k(first_level, b_topk.value, alpha, predictions)
+    update_top_k(first_level, top_k, alpha, predictions)
     prev_level = SparkContext.broadcast(sc, first_level)
     levels.append(prev_level)
     cur_lvl = 1
-    b_topk.value.print_topk()
+    top_k.print_topk()
     while len(levels[cur_lvl - 1].value) > 0:
         cur_lvl_res = {}
+        b_topk = SparkContext.broadcast(sc, top_k)
         for left in range(int(cur_lvl / 2) + 1):
             right = cur_lvl - left - 1
             partitions = sc.parallelize(levels[left].value.values())
@@ -59,7 +60,7 @@ def process(all_features, predictions, loss, sc, debug, 
alpha, k, w, loss_type,
             cur_lvl_res.update(partial)
         prev_level = SparkContext.broadcast(sc, cur_lvl_res)
         levels.append(prev_level)
-        update_top_k(cur_lvl_res, b_topk.value, alpha, predictions)
+        update_top_k(cur_lvl_res, top_k, alpha, predictions)
         cur_lvl = cur_lvl + 1
         top_k.print_topk()
         print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl - 
1].value) * (len(levels[cur_lvl - 1].value) - 1)) +
@@ -67,4 +68,4 @@ def process(all_features, predictions, loss, sc, debug, 
alpha, k, w, loss_type,
     print("Program stopped at level " + str(cur_lvl))
     print()
     print("Selected slices are: ")
-    b_topk.value.print_topk()
+    top_k.print_topk()
diff --git a/scripts/staging/slicing/spark_modules/spark_utils.py 
b/scripts/staging/slicing/spark_modules/spark_utils.py
index a12c84b..cb83c71 100644
--- a/scripts/staging/slicing/spark_modules/spark_utils.py
+++ b/scripts/staging/slicing/spark_modules/spark_utils.py
@@ -49,6 +49,12 @@ def approved_join_slice(node_i, node_j, cur_lvl):
     return commons == cur_lvl - 1
 
 
+def approved_union_slice(node_i, node_j):
+    if set(node_i.attributes).intersection(set(node_j.attributes)):
+        return False
+    return True
+
+
 def make_first_level(features, predictions, loss, top_k, w, loss_type):
     first_level = []
     # First level slices are enumerated in a "classic way" (getting data and 
not analyzing bounds
@@ -66,15 +72,6 @@ def make_first_level(features, predictions, loss, top_k, w, 
loss_type):
     return first_level
 
 
-def approved_union_slice(node_i, node_j):
-    for attr1 in node_i.attributes:
-        for attr2 in node_j.attributes:
-            if attr1 == attr2:
-                # there are common attributes which is not the case we need
-                return False
-    return True
-
-
 def process_node(node_i, level, loss, predictions, cur_lvl, top_k, alpha, 
loss_type, w, debug, enumerator):
     cur_enum_nodes = []
     for node_j in level:
diff --git a/scripts/staging/slicing/spark_modules/union_data_parallel.py 
b/scripts/staging/slicing/spark_modules/union_data_parallel.py
index b000abe..dcd1af3 100644
--- a/scripts/staging/slicing/spark_modules/union_data_parallel.py
+++ b/scripts/staging/slicing/spark_modules/union_data_parallel.py
@@ -67,7 +67,7 @@ def parallel_process(all_features, predictions, loss, sc, 
debug, alpha, k, w, lo
     b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
     buckets = {}
     for node in cur_lvl_nodes:
-        bucket = Bucket(node, b_cur_lvl.value, w, x_size, loss)
+        bucket = Bucket(node, cur_lvl, w, x_size, loss)
         buckets[bucket.name] = bucket
     b_buckets = SparkContext.broadcast(sc, buckets)
     rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2])) \
@@ -83,37 +83,39 @@ def parallel_process(all_features, predictions, loss, sc, 
debug, alpha, k, w, lo
     prev_level = cur_lvl_nodes.collect()
     b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
     levels.append(b_cur_lvl_nodes)
-    top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
-    while len(levels[cur_lvl - 1].value) > 0:
+    top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+    while len(prev_level) > 0:
         b_topk = SparkContext.broadcast(sc, top_k)
         b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
-        b_topk.value.print_topk()
+        top_k.print_topk()
         buckets = []
         for left in range(int(cur_lvl / 2) + 1):
             right = cur_lvl - left - 1
-            nodes = union_enum(levels[left].value, levels[right].value, 
x_size, alpha, b_topk.value, w, loss, b_cur_lvl.value)
+            nodes = union_enum(levels[left].value, levels[right].value, 
x_size, alpha, top_k, w, loss, cur_lvl)
             buckets.append(nodes)
         b_buckets = sc.parallelize(buckets)
         all_buckets = b_buckets.flatMap(lambda line: (line.items()))
         combined = dict(all_buckets.combineByKey(combiner, merge_values, 
merge_combiners).collect())
         b_buckets = SparkContext.broadcast(sc, combined)
-        to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size, 
alpha, b_topk.value), b_buckets.value.items()))
-        mapped = rows.map(lambda row: rows_mapper(row, to_slice, loss_type))
+        to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size, 
alpha, top_k), combined.items()))
+        b_to_slice = SparkContext.broadcast(sc, to_slice)
+        mapped = rows.map(lambda row: rows_mapper(row, b_to_slice.value, 
loss_type))
         flattened = mapped.flatMap(lambda line: (line.items()))
         partial = flattened.combineByKey(combiner, 
join_data_parallel.merge_values, join_data_parallel.merge_combiners)
         prev_level = partial\
             .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1], 
loss, w, x_size, b_cur_lvl.value)).collect()
+        top_k = top_k.buckets_top_k(prev_level, x_size, alpha)
+        b_topk = SparkContext.broadcast(sc, top_k)
         if debug:
             partial.values().map(lambda bucket: 
bucket.print_debug(b_topk.value)).collect()
-        top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
         print("Level " + str(cur_lvl) + " had " + str(
             len(levels[cur_lvl - 1].value) * (len(levels[cur_lvl - 1].value) - 
1)) +
               " candidates but after pruning only " + str(len(prev_level)) + " 
go to the next level")
         print("Program stopped at level " + str(cur_lvl))
         b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
         levels.append(b_cur_lvl_nodes)
-        cur_lvl = b_cur_lvl.value + 1
+        cur_lvl += 1
     print()
     print("Selected slices are: ")
-    b_topk.value.print_topk()
+    top_k.print_topk()
     return None
diff --git a/scripts/staging/slicing/tests/classification/__init__.py 
b/scripts/staging/slicing/tests/classification/__init__.py
index e66abb4..cc59154 100644
--- a/scripts/staging/slicing/tests/classification/__init__.py
+++ b/scripts/staging/slicing/tests/classification/__init__.py
@@ -1,4 +1,4 @@
-# -------------------------------------------------------------
+#-------------------------------------------------------------
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -17,4 +17,4 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-# -------------------------------------------------------------
+#-------------------------------------------------------------
diff --git a/scripts/staging/slicing/tests/regression/__init__.py 
b/scripts/staging/slicing/tests/regression/__init__.py
index e66abb4..cc59154 100644
--- a/scripts/staging/slicing/tests/regression/__init__.py
+++ b/scripts/staging/slicing/tests/regression/__init__.py
@@ -1,4 +1,4 @@
-# -------------------------------------------------------------
+#-------------------------------------------------------------
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -17,4 +17,4 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-# -------------------------------------------------------------
+#-------------------------------------------------------------

Reply via email to