This is an automated email from the ASF dual-hosted git repository. skperez pushed a commit to branch feature/SDAP-402 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 6f6e102dc18d92037c52fb766853094ed8260bf4 Author: skorper <[email protected]> AuthorDate: Tue Sep 27 17:05:45 2022 -0700 Updated matchup matchOnce logic --- analysis/webservice/algorithms_spark/Matchup.py | 46 +++++++++++++++++++++---- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py index 5dafb31..4bd33cf 100644 --- a/analysis/webservice/algorithms_spark/Matchup.py +++ b/analysis/webservice/algorithms_spark/Matchup.py @@ -349,7 +349,6 @@ class DomsPoint(object): self.data = None self.source = None - self.depth = None self.platform = None self.device = None self.file_url = None @@ -558,7 +557,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n parameter_b = sc.broadcast(parameter) # Parallelize list of tile ids - rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids))) + rdd = sc.parallelize(tile_ids, determine_parallelism(len(tile_ids))) # Map Partitions ( list(tile_id) ) rdd_filtered = rdd.mapPartitions( @@ -601,10 +600,43 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n matchup_time = iso_time_to_epoch(matchup.time) return abs(primary_time - matchup_time) - rdd_filtered = rdd_filtered \ - .map(lambda primary_matchup: tuple([primary_matchup[0], tuple([primary_matchup[1], dist(primary_matchup[0], primary_matchup[1])])])) \ - .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] < match_2[1] else match_2) \ - .mapValues(lambda x: [x[0]]) + def filter_closest(matches): + """ + Filter given matches. Find the closest match to the primary + point and only keep other matches that match the same + time/space as that point. + + :param matches: List of match tuples. Each tuple has the following format: + 1. The secondary match + 2. Tuple of form (space_dist, time_dist) + """ + closest_point = min(matches, key=lambda match: match[1])[0] + matches = list(filter( + lambda match: match.latitude == closest_point.latitude and + match.longitude == closest_point.longitude and + match.time == closest_point.time, map( + lambda match: match[0], matches + ) + )) + return matches + + rez = rdd_filtered.collect() + + print(f'match_debugging: {type(rez)}') + print(f'match_debugging: {len(rez)}') + print(f'match_debugging: {type(rez[-1])}') + print(f'match_debugging: {len(rez[-1])}') + print(f'match_debugging: {type(rez[0][0])}') + print(f'match_debugging: {type(rez[0][1])}') + + rdd_filtered = rdd_filtered.map( + lambda primary_matchup: tuple( + [primary_matchup[0], tuple([primary_matchup[1], dist(primary_matchup[0], primary_matchup[1])])] + )).combineByKey( + lambda value: [value], + lambda value_list, value: value_list + [value], + lambda value_list_a, value_list_b: value_list_a + value_list_b + ).mapValues(lambda matches: filter_closest(matches)) else: rdd_filtered = rdd_filtered \ .combineByKey(lambda value: [value], # Create 1 element list @@ -616,7 +648,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n return result_as_map -def determine_parllelism(num_tiles): +def determine_parallelism(num_tiles): """ Try to stay at a maximum of 140 tiles per partition; But don't go over 128 partitions. Also, don't go below the default of 8
