This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch feature/SDAP-402
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 6f6e102dc18d92037c52fb766853094ed8260bf4
Author: skorper <[email protected]>
AuthorDate: Tue Sep 27 17:05:45 2022 -0700

    Updated matchup matchOnce logic
---
 analysis/webservice/algorithms_spark/Matchup.py | 46 +++++++++++++++++++++----
 1 file changed, 39 insertions(+), 7 deletions(-)

diff --git a/analysis/webservice/algorithms_spark/Matchup.py 
b/analysis/webservice/algorithms_spark/Matchup.py
index 5dafb31..4bd33cf 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -349,7 +349,6 @@ class DomsPoint(object):
         self.data = None
 
         self.source = None
-        self.depth = None
         self.platform = None
         self.device = None
         self.file_url = None
@@ -558,7 +557,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt, 
primary_ds_name, secondary_ds_n
         parameter_b = sc.broadcast(parameter)
 
         # Parallelize list of tile ids
-        rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids)))
+        rdd = sc.parallelize(tile_ids, determine_parallelism(len(tile_ids)))
 
     # Map Partitions ( list(tile_id) )
     rdd_filtered = rdd.mapPartitions(
@@ -601,10 +600,43 @@ def spark_matchup_driver(tile_ids, bounding_wkt, 
primary_ds_name, secondary_ds_n
             matchup_time = iso_time_to_epoch(matchup.time)
             return abs(primary_time - matchup_time)
 
-        rdd_filtered = rdd_filtered \
-            .map(lambda primary_matchup: tuple([primary_matchup[0], 
tuple([primary_matchup[1], dist(primary_matchup[0], primary_matchup[1])])])) \
-            .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] < 
match_2[1] else match_2) \
-            .mapValues(lambda x: [x[0]])
+        def filter_closest(matches):
+            """
+            Filter given matches. Find the closest match to the primary
+            point and only keep other matches that match the same
+            time/space as that point.
+
+            :param matches: List of match tuples. Each tuple has the following 
format:
+                1. The secondary match
+                2. Tuple of form (space_dist, time_dist)
+            """
+            closest_point = min(matches, key=lambda match: match[1])[0]
+            matches = list(filter(
+                lambda match: match.latitude == closest_point.latitude and
+                              match.longitude == closest_point.longitude and
+                              match.time == closest_point.time, map(
+                    lambda match: match[0], matches
+                )
+            ))
+            return matches
+
+        rez = rdd_filtered.collect()
+
+        print(f'match_debugging: {type(rez)}')
+        print(f'match_debugging: {len(rez)}')
+        print(f'match_debugging: {type(rez[-1])}')
+        print(f'match_debugging: {len(rez[-1])}')
+        print(f'match_debugging: {type(rez[0][0])}')
+        print(f'match_debugging: {type(rez[0][1])}')
+
+        rdd_filtered = rdd_filtered.map(
+            lambda primary_matchup: tuple(
+                [primary_matchup[0], tuple([primary_matchup[1], 
dist(primary_matchup[0], primary_matchup[1])])]
+            )).combineByKey(
+                lambda value: [value],
+                lambda value_list, value: value_list + [value],
+                lambda value_list_a, value_list_b: value_list_a + value_list_b
+            ).mapValues(lambda matches: filter_closest(matches))
     else:
         rdd_filtered = rdd_filtered \
             .combineByKey(lambda value: [value],  # Create 1 element list
@@ -616,7 +648,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt, 
primary_ds_name, secondary_ds_n
     return result_as_map
 
 
-def determine_parllelism(num_tiles):
+def determine_parallelism(num_tiles):
     """
     Try to stay at a maximum of 140 tiles per partition; But don't go over 128 
partitions.
     Also, don't go below the default of 8

Reply via email to