This is an automated email from the ASF dual-hosted git repository.
rkk pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/sdap-ingester.git
The following commit(s) were added to refs/heads/develop by this push:
new 5fc0f45 SDAP-472 - Support for defining gridded Zarr datasets from
the collection config (#86)
5fc0f45 is described below
commit 5fc0f45221a6bb1333d776e6488d7186bce90901
Author: Riley Kuttruff <[email protected]>
AuthorDate: Thu May 9 10:16:55 2024 -0700
SDAP-472 - Support for defining gridded Zarr datasets from the collection
config (#86)
* Zarr onboarding
* Dynamic dataset management
* nexusdatasets setup
* Fix bad field name in Solr doc
* Changelog
* Changelog
---------
Co-authored-by: rileykk <[email protected]>
---
CHANGELOG.md | 1 +
.../collection_manager/entities/Collection.py | 21 ++++++++++++++++---
collection_manager/collection_manager/main.py | 1 +
.../services/CollectionProcessor.py | 24 +++++++++++++++++++---
.../services/CollectionWatcher.py | 7 ++++++-
.../history_manager/SolrIngestionHistory.py | 23 ++++++++++++++++++---
6 files changed, 67 insertions(+), 10 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0058cfe..754e47e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,7 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
+- SDAP-472: Added support for defining Zarr collections in the collection
config
### Changed
- Improved Collection Manager logging
- Inhibited overly verbose loggers
diff --git a/collection_manager/collection_manager/entities/Collection.py
b/collection_manager/collection_manager/entities/Collection.py
index 3554f34..f2544c6 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -33,6 +33,7 @@ class CollectionStorageType(Enum):
LOCAL = 1
S3 = 2
REMOTE = 3
+ ZARR = 4
@dataclass(frozen=True)
@@ -49,6 +50,8 @@ class Collection:
preprocess: str = None
processors: str = None
group: str = None
+ store_type: str = None
+ config: str = None
@staticmethod
def __decode_dimension_names(dimension_names_dict):
@@ -83,13 +86,20 @@ class Collection:
date_to = datetime.fromisoformat(properties['to']) if 'to' in
properties else None
date_from = datetime.fromisoformat(properties['from']) if 'from'
in properties else None
+ store_type = properties.get('storeType')
+
+ slices = properties.get('slices', {})
+
preprocess = json.dumps(properties['preprocess']) if 'preprocess'
in properties else None
extra_processors = json.dumps(properties['processors']) if
'processors' in properties else None
+ config = properties['config'] if 'config' in properties else None
+
+ projection = properties['projection'] if 'projection' in
properties else None
collection = Collection(dataset_id=properties['id'],
- projection=properties['projection'],
+ projection=projection,
dimension_names=frozenset(Collection.__decode_dimension_names(properties['dimensionNames'])),
-
slices=frozenset(properties['slices'].items()),
+ slices=frozenset(slices),
path=properties['path'],
historical_priority=properties['priority'],
forward_processing_priority=properties.get('forward-processing-priority', None),
@@ -97,12 +107,17 @@ class Collection:
date_from=date_from,
preprocess=preprocess,
processors=extra_processors,
- group=properties.get('group'))
+ group=properties.get('group'),
+ store_type=store_type,
+ config=config
+ )
return collection
except KeyError as e:
raise MissingValueCollectionError(missing_value=e.args[0])
def storage_type(self):
+ if self.store_type == 'zarr':
+ return CollectionStorageType.ZARR
if urlparse(self.path).scheme == 's3':
return CollectionStorageType.S3
elif urlparse(self.path).scheme in {'http', 'https'}:
diff --git a/collection_manager/collection_manager/main.py
b/collection_manager/collection_manager/main.py
index 452fa25..4f5acf6 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -120,6 +120,7 @@ async def main():
history_manager_builder=history_manager_builder)
collection_watcher =
CollectionWatcher(collections_path=options.collections_path,
granule_updated_callback=collection_processor.process_granule,
+
dataset_added_callback=collection_processor.add_plugin_collection,
collections_refresh_interval=int(options.refresh),
s3_bucket=options.s3_bucket)
diff --git
a/collection_manager/collection_manager/services/CollectionProcessor.py
b/collection_manager/collection_manager/services/CollectionProcessor.py
index 6b8f243..6671a50 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -16,13 +16,14 @@
import json
import logging
import os.path
-from typing import Dict
+from typing import Dict, Optional
import yaml
from collection_manager.entities import Collection
from collection_manager.services import MessagePublisher
from collection_manager.services.history_manager import (GranuleStatus,
- IngestionHistory)
+ IngestionHistory,
+ SolrIngestionHistory)
from collection_manager.services.history_manager.IngestionHistory import \
IngestionHistoryBuilder
@@ -75,12 +76,29 @@ class CollectionProcessor:
await self._publisher.publish_message(body=dataset_config,
priority=use_priority)
await history_manager.push(granule, modified_time)
+ def add_plugin_collection(self, collection: Collection):
+ history_manager = self._get_history_manager(None)
+
+ if isinstance(history_manager, SolrIngestionHistory):
+ collection_config = {
+ 'path': collection.path,
+ 'config': collection.config
+ }
+
+ collection_dimensions = dict(collection.dimension_names)
+
+ collection_config['config']['variables'] =
collection_dimensions['variable']
+ collection_config['config']['coords'] = {dim:
collection_dimensions[dim]
+ for dim in
collection_dimensions if dim != 'variable'}
+
+ history_manager._push_dataset(collection.dataset_id,
collection.store_type, json.dumps(collection_config))
+
@staticmethod
def _file_supported(file_path: str):
ext = os.path.splitext(file_path)[-1]
return ext in SUPPORTED_FILE_EXTENSIONS
- def _get_history_manager(self, dataset_id: str) -> IngestionHistory:
+ def _get_history_manager(self, dataset_id: Optional[str]) ->
IngestionHistory:
if dataset_id not in self._history_manager_cache:
self._history_manager_cache[dataset_id] =
self._history_manager_builder.build(dataset_id=dataset_id)
return self._history_manager_cache[dataset_id]
diff --git
a/collection_manager/collection_manager/services/CollectionWatcher.py
b/collection_manager/collection_manager/services/CollectionWatcher.py
index c28ff6a..94863fd 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -42,6 +42,7 @@ class CollectionWatcher:
def __init__(self,
collections_path: str,
granule_updated_callback: Callable[[str, Collection],
Awaitable],
+ dataset_added_callback: Callable[[Collection], None],
s3_bucket: Optional[str] = None,
collections_refresh_interval: float = 30):
if not os.path.isabs(collections_path):
@@ -49,6 +50,7 @@ class CollectionWatcher:
self._collections_path = collections_path
self._granule_updated_callback = granule_updated_callback
+ self._dataset_added_callback = dataset_added_callback
self._collections_refresh_interval = collections_refresh_interval
self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
@@ -101,10 +103,13 @@ class CollectionWatcher:
for collection_dict in collections_yaml['collections']:
try:
collection = Collection.from_dict(collection_dict)
- if collection.storage_type() !=
CollectionStorageType.REMOTE:
+ if collection.storage_type() == CollectionStorageType.ZARR:
+ self._dataset_added_callback(collection)
+ elif collection.storage_type() !=
CollectionStorageType.REMOTE:
self._validate_collection(collection)
self._collections_by_dir[collection.directory()].add(collection)
except MissingValueCollectionError as e:
+ logger.exception(e)
logger.error(f"A collection is missing
'{e.missing_value}'. Ignoring this collection for now.")
except RelativePathCollectionError as e:
logger.error(f"Relative paths are not allowed for the
'path' property of a collection. "
diff --git
a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 6aff426..acf91d3 100644
---
a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++
b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -15,7 +15,7 @@
import hashlib
import logging
-
+from datetime import datetime
import pysolr
import requests
from collection_manager.services.history_manager.IngestionHistory import
(IngestionHistory, IngestionHistoryBuilder)
@@ -82,6 +82,20 @@ class SolrIngestionHistory(IngestionHistory):
'latest_update_l': self._latest_ingested_file_update}])
self._solr_datasets.commit()
+ @run_in_executor
+ def _push_dataset(self, dataset_id, type, config):
+ if self._solr_datasets:
+ if len(self._solr_datasets.search(q=f'id:{dataset_id}')) == 0:
+ self._solr_datasets.add([{
+ 'id': dataset_id,
+ 'dataset_s': dataset_id,
+ 'latest_update_l': int(datetime.now().timestamp()),
+ 'store_type_s': type,
+ 'config': config,
+ 'source_s': 'collection_config'
+ }])
+ self._solr_datasets.commit()
+
def _get_latest_file_update(self):
results = self._solr_datasets.search(q=f"id:{self._dataset_id}")
if results:
@@ -141,12 +155,15 @@ class SolrIngestionHistory(IngestionHistory):
schema_endpoint =
f"{self._url_prefix}/{self._dataset_collection_name}/schema"
self._add_field(schema_endpoint, "dataset_s", "string")
self._add_field(schema_endpoint, "latest_update_l",
"TrieLongField")
+ self._add_field(schema_endpoint, "store_type_s", "string",
True)
+ self._add_field(schema_endpoint, "source_s", "string", True)
+ self._add_field(schema_endpoint, "config", "text_general",
True)
except requests.exceptions.RequestException as e:
logger.error(f"solr instance unreachable {self._solr_url}")
raise e
- def _add_field(self, schema_url, field_name, field_type):
+ def _add_field(self, schema_url, field_name, field_type, stored=False):
"""
Helper to add a string field in a solr schema
:param schema_url:
@@ -158,7 +175,7 @@ class SolrIngestionHistory(IngestionHistory):
"add-field": {
"name": field_name,
"type": field_type,
- "stored": False
+ "stored": stored
}
}
return self._req_session.post(schema_url,
data=str(add_field_payload).encode('utf-8'))