This is an automated email from the ASF dual-hosted git repository. rkk pushed a commit to branch SDAP-540-solr-mutex in repository https://gitbox.apache.org/repos/asf/sdap-nexus.git
commit 4a97d8114e758eb85d399aa9f5043c2c637df1f1 Author: rileykk <[email protected]> AuthorDate: Thu Dec 18 10:42:44 2025 -0800 add some log statements + solr mutexing for ds w/d and update scan --- data-access/nexustiles/nexustiles.py | 139 ++++++++++++++++++++--------------- 1 file changed, 78 insertions(+), 61 deletions(-) diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index 229762f..9dbbd21 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -215,51 +215,58 @@ class NexusTileService: next_cursor_mark = '*' added_datasets = [] + dataset_docs = [] - while True: - response = solrcon.search('*:*', cursorMark=next_cursor_mark, sort='id asc') + with SOLR_LOCK: + update_logger.info('Scanning solr for datasets') + while True: + response = solrcon.search('*:*', cursorMark=next_cursor_mark, sort='id asc') - try: - response_cursor_mark = response.nextCursorMark - except AttributeError: - break + try: + response_cursor_mark = response.nextCursorMark + except AttributeError: + break - if response_cursor_mark == next_cursor_mark: - break - else: - next_cursor_mark = response_cursor_mark - - for dataset in response.docs: - d_id = dataset['dataset_s'] - store_type = dataset.get('store_type_s', 'nexusproto') - - present_datasets.add(d_id) - - if d_id in NexusTileService.backends: - if not NexusTileService.backends[d_id]['backend'].update(True): - logger.info(f'Dataset {d_id} of type {store_type} is no longer accessible and will be removed') - present_datasets.remove(d_id) - continue - - added_datasets.append(d_id) - - if store_type == 'nexus_proto' or store_type == 'nexusproto': - update_logger.info(f"Detected new nexusproto dataset {d_id}, using default nexusproto backend") - NexusTileService.backends[d_id] = NexusTileService.backends[None] - elif store_type == 'zarr': - update_logger.info(f"Detected new zarr dataset {d_id}, opening new zarr backend") - - ds_config = json.loads(dataset['config'][0]) - try: - NexusTileService.backends[d_id] = { - 'backend': ZarrBackend(dataset_name=dataset['dataset_s'], **ds_config), - 'up': True - } - except NexusTileServiceException: - added_datasets.pop() + if response_cursor_mark == next_cursor_mark: + break else: - update_logger.warning(f'Unsupported backend {store_type} for dataset {d_id}') + next_cursor_mark = response_cursor_mark + + dataset_docs.extend(response.docs) + update_logger.info('Finished solr scan') + + for dataset in dataset_docs: + d_id = dataset['dataset_s'] + store_type = dataset.get('store_type_s', 'nexusproto') + + present_datasets.add(d_id) + + if d_id in NexusTileService.backends: + if not NexusTileService.backends[d_id]['backend'].update(True): + logger.info(f'Dataset {d_id} of type {store_type} is no longer accessible and will be removed') + present_datasets.remove(d_id) + continue + + added_datasets.append(d_id) + + if store_type == 'nexus_proto' or store_type == 'nexusproto': + update_logger.info(f"Detected new nexusproto dataset {d_id}, using default nexusproto backend") + NexusTileService.backends[d_id] = NexusTileService.backends[None] + elif store_type == 'zarr': + update_logger.info(f"Detected new zarr dataset {d_id}, opening new zarr backend") + + ds_config = json.loads(dataset['config'][0]) + try: + NexusTileService.backends[d_id] = { + 'backend': ZarrBackend(dataset_name=dataset['dataset_s'], **ds_config), + 'up': True + } + except NexusTileServiceException as e: + update_logger.warning(f'Failed to add {d_id}: {e}') added_datasets.pop() + else: + update_logger.warning(f'Unsupported backend {store_type} for dataset {d_id}') + added_datasets.pop() removed_datasets = set(NexusTileService.backends.keys()).difference(present_datasets) @@ -272,6 +279,13 @@ class NexusTileService: update_logger.info(f'Finished dataset update: {len(added_datasets)} added, {len(removed_datasets)} removed, ' f'{len(NexusTileService.backends) - 2} total') + update_logger.info('New datasets:') + for dataset in added_datasets: + update_logger.info(f" - {dataset}") + + update_logger.info('Removed datasets:') + for dataset in removed_datasets: + update_logger.info(f" - {dataset}") return added_datasets @@ -294,16 +308,17 @@ class NexusTileService: config_dict['config'] = config - solr.delete(id=ds['id']) - solr.add([{ - 'id': name, - 'dataset_s': name, - 'latest_update_l': int(datetime.now().timestamp()), - 'store_type_s': ds['store_type_s'], - 'config': json.dumps(config_dict), - 'source_s': 'user_added' - }]) - solr.commit() + with SOLR_LOCK: + solr.delete(id=ds['id']) + solr.add([{ + 'id': name, + 'dataset_s': name, + 'latest_update_l': int(datetime.now().timestamp()), + 'store_type_s': ds['store_type_s'], + 'config': json.dumps(config_dict), + 'source_s': 'user_added' + }]) + solr.commit() logger.info(f'Updated dataset {name} in Solr. Updating backends') @@ -327,15 +342,16 @@ class NexusTileService: 'config': config } - solr.add([{ - 'id': name, - 'dataset_s': name, - 'latest_update_l': int(datetime.now().timestamp()), - 'store_type_s': type, - 'config': json.dumps(config_dict), - 'source_s': 'user_added' - }]) - solr.commit() + with SOLR_LOCK: + solr.add([{ + 'id': name, + 'dataset_s': name, + 'latest_update_l': int(datetime.now().timestamp()), + 'store_type_s': type, + 'config': json.dumps(config_dict), + 'source_s': 'user_added' + }]) + solr.commit() logger.info(f'Added dataset {name} to Solr. Updating backends') @@ -365,8 +381,9 @@ class NexusTileService: if 'source_s' not in ds or ds['source_s'] == 'collection_config': raise ValueError('Provided dataset is source_s in collection config and cannot be deleted') - solr.delete(id=ds['id']) - solr.commit() + with SOLR_LOCK: + solr.delete(id=ds['id']) + solr.commit() logger.info(f'Removed dataset {name} from Solr. Updating backends')
