This is an automated email from the ASF dual-hosted git repository. skperez pushed a commit to branch SDAP-473 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit a9a9d1e63746c5ca54c4bda4f18a1879e8ed3e5e Author: skorper <[email protected]> AuthorDate: Thu Jun 29 19:43:36 2023 -0700 job prioritization --- analysis/setup.py | 24 ++++++++++++---------- analysis/webservice/algorithms_spark/Matchup.py | 10 +++++++++ analysis/webservice/config/scheduler.xml | 10 +++++++++ .../app_builders/SparkContextBuilder.py | 9 +++++++- 4 files changed, 41 insertions(+), 12 deletions(-) diff --git a/analysis/setup.py b/analysis/setup.py index 99cd707..c09fe4a 100644 --- a/analysis/setup.py +++ b/analysis/setup.py @@ -17,18 +17,19 @@ import setuptools from subprocess import check_call, CalledProcessError -with open('../VERSION.txt', 'r') as f: - __version__ = f.read() +# with open('../VERSION.txt', 'r') as f: +# __version__ = f.read() +__version__ = '1.1.0a3' -try: - check_call(['mamba', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt']) -except (CalledProcessError, IOError) as e: - print('Failed install with mamba; falling back to conda') - try: - check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt']) - except (CalledProcessError, IOError) as e: - raise EnvironmentError("Error installing conda packages", e) +# try: +# check_call(['mamba', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt']) +# except (CalledProcessError, IOError) as e: +# print('Failed install with mamba; falling back to conda') +# try: +# check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt']) +# except (CalledProcessError, IOError) as e: +# raise EnvironmentError("Error installing conda packages", e) setuptools.setup( @@ -60,7 +61,8 @@ setuptools.setup( 'config/algorithms.ini', 'apidocs/index.html', 'apidocs/openapi.yml', - 'apidocs/dataset-populate.js' + 'apidocs/dataset-populate.js', + 'config/scheduler.xml' ], 'webservice.algorithms.doms': ['domsconfig.ini.default'], }, diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py index 4fb40cf..30d8261 100644 --- a/analysis/webservice/algorithms_spark/Matchup.py +++ b/analysis/webservice/algorithms_spark/Matchup.py @@ -44,6 +44,8 @@ from webservice.webmodel.NexusExecutionResults import ExecutionStatus EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' +LARGE_JOB_THRESHOLD = 4000 + class Schema: def __init__(self): @@ -234,6 +236,11 @@ class Matchup(NexusCalcSparkTornadoHandler): depth_min, depth_max, time_tolerance, radius_tolerance, \ platforms, match_once, result_size_limit, prioritize_distance + def get_job_pool(self, tile_ids): + if len(tile_ids) > LARGE_JOB_THRESHOLD: + return 'large' + return 'small' + def async_calc(self, execution_id, tile_ids, bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, start_time, end_time, depth_min, depth_max, time_tolerance, radius_tolerance, platforms, match_once, @@ -241,8 +248,11 @@ class Matchup(NexusCalcSparkTornadoHandler): # Call spark_matchup self.log.debug("Calling Spark Driver") + job_priority = self.get_job_pool(tile_ids) + try: self._sc.setJobGroup(execution_id, execution_id) + self._sc.setLocalProperty('spark.scheduler.pool', job_priority) spark_result = spark_matchup_driver( tile_ids, wkt.dumps(bounding_polygon), primary_ds_name, diff --git a/analysis/webservice/config/scheduler.xml b/analysis/webservice/config/scheduler.xml new file mode 100644 index 0000000..3016017 --- /dev/null +++ b/analysis/webservice/config/scheduler.xml @@ -0,0 +1,10 @@ +<?xml version="1.0"?> +<allocations> + <pool name="small"> + <weight>1000</weight> + <minShare>1</minShare> + </pool> + <pool name="large"> + <weight>1</weight> + </pool> +</allocations> \ No newline at end of file diff --git a/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py b/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py index ee3fd2f..5daf279 100644 --- a/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py +++ b/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import pkg_resources class SparkContextBuilder: @@ -25,7 +26,13 @@ class SparkContextBuilder: if cls.spark_context is None: from pyspark.sql import SparkSession - spark = SparkSession.builder.appName("nexus-analysis").getOrCreate() + scheduler_path = pkg_resources.resource_filename('webservice', "config/scheduler.xml") + + spark = SparkSession.builder.appName("nexus-analysis").config( + "spark.scheduler.allocation.file", scheduler_path + ).config( + "spark.scheduler.mode", "FAIR" + ).getOrCreate() cls.spark_context = spark.sparkContext return cls.spark_context
