This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/sdap-ingester.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5fc0f45  SDAP-472 - Support for defining gridded Zarr datasets from 
the collection config (#86)
5fc0f45 is described below

commit 5fc0f45221a6bb1333d776e6488d7186bce90901
Author: Riley Kuttruff <[email protected]>
AuthorDate: Thu May 9 10:16:55 2024 -0700

    SDAP-472 - Support for defining gridded Zarr datasets from the collection 
config (#86)
    
    * Zarr onboarding
    
    * Dynamic dataset management
    
    * nexusdatasets setup
    
    * Fix bad field name in Solr doc
    
    * Changelog
    
    * Changelog
    
    ---------
    
    Co-authored-by: rileykk <[email protected]>
---
 CHANGELOG.md                                       |  1 +
 .../collection_manager/entities/Collection.py      | 21 ++++++++++++++++---
 collection_manager/collection_manager/main.py      |  1 +
 .../services/CollectionProcessor.py                | 24 +++++++++++++++++++---
 .../services/CollectionWatcher.py                  |  7 ++++++-
 .../history_manager/SolrIngestionHistory.py        | 23 ++++++++++++++++++---
 6 files changed, 67 insertions(+), 10 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0058cfe..754e47e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,7 @@ and this project adheres to [Semantic 
Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 ### Added
+- SDAP-472: Added support for defining Zarr collections in the collection 
config
 ### Changed
 - Improved Collection Manager logging
   - Inhibited overly verbose loggers
diff --git a/collection_manager/collection_manager/entities/Collection.py 
b/collection_manager/collection_manager/entities/Collection.py
index 3554f34..f2544c6 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -33,6 +33,7 @@ class CollectionStorageType(Enum):
     LOCAL = 1
     S3 = 2
     REMOTE = 3
+    ZARR = 4
 
 
 @dataclass(frozen=True)
@@ -49,6 +50,8 @@ class Collection:
     preprocess: str = None
     processors: str = None
     group: str = None
+    store_type: str = None
+    config: str = None
 
     @staticmethod
     def __decode_dimension_names(dimension_names_dict):
@@ -83,13 +86,20 @@ class Collection:
             date_to = datetime.fromisoformat(properties['to']) if 'to' in 
properties else None
             date_from = datetime.fromisoformat(properties['from']) if 'from' 
in properties else None
 
+            store_type = properties.get('storeType')
+
+            slices = properties.get('slices', {})
+
             preprocess = json.dumps(properties['preprocess']) if 'preprocess' 
in properties else None
             extra_processors = json.dumps(properties['processors']) if 
'processors' in properties else None
+            config = properties['config'] if 'config' in properties else None
+
+            projection = properties['projection'] if 'projection' in 
properties else None
 
             collection = Collection(dataset_id=properties['id'],
-                                    projection=properties['projection'],
+                                    projection=projection,
                                     
dimension_names=frozenset(Collection.__decode_dimension_names(properties['dimensionNames'])),
-                                    
slices=frozenset(properties['slices'].items()),
+                                    slices=frozenset(slices),
                                     path=properties['path'],
                                     historical_priority=properties['priority'],
                                     
forward_processing_priority=properties.get('forward-processing-priority', None),
@@ -97,12 +107,17 @@ class Collection:
                                     date_from=date_from,
                                     preprocess=preprocess,
                                     processors=extra_processors,
-                                    group=properties.get('group'))
+                                    group=properties.get('group'),
+                                    store_type=store_type,
+                                    config=config
+                                    )
             return collection
         except KeyError as e:
             raise MissingValueCollectionError(missing_value=e.args[0])
 
     def storage_type(self):
+        if self.store_type == 'zarr':
+            return CollectionStorageType.ZARR
         if urlparse(self.path).scheme == 's3':
             return CollectionStorageType.S3
         elif urlparse(self.path).scheme in {'http', 'https'}:
diff --git a/collection_manager/collection_manager/main.py 
b/collection_manager/collection_manager/main.py
index 452fa25..4f5acf6 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -120,6 +120,7 @@ async def main():
                                                        
history_manager_builder=history_manager_builder)
             collection_watcher = 
CollectionWatcher(collections_path=options.collections_path,
                                                    
granule_updated_callback=collection_processor.process_granule,
+                                                   
dataset_added_callback=collection_processor.add_plugin_collection,
                                                    
collections_refresh_interval=int(options.refresh),
                                                    s3_bucket=options.s3_bucket)
 
diff --git 
a/collection_manager/collection_manager/services/CollectionProcessor.py 
b/collection_manager/collection_manager/services/CollectionProcessor.py
index 6b8f243..6671a50 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -16,13 +16,14 @@
 import json
 import logging
 import os.path
-from typing import Dict
+from typing import Dict, Optional
 
 import yaml
 from collection_manager.entities import Collection
 from collection_manager.services import MessagePublisher
 from collection_manager.services.history_manager import (GranuleStatus,
-                                                         IngestionHistory)
+                                                         IngestionHistory,
+                                                         SolrIngestionHistory)
 from collection_manager.services.history_manager.IngestionHistory import \
     IngestionHistoryBuilder
 
@@ -75,12 +76,29 @@ class CollectionProcessor:
         await self._publisher.publish_message(body=dataset_config, 
priority=use_priority)
         await history_manager.push(granule, modified_time)
 
+    def add_plugin_collection(self, collection: Collection):
+        history_manager = self._get_history_manager(None)
+
+        if isinstance(history_manager, SolrIngestionHistory):
+            collection_config = {
+                'path': collection.path,
+                'config': collection.config
+            }
+
+            collection_dimensions = dict(collection.dimension_names)
+
+            collection_config['config']['variables'] = 
collection_dimensions['variable']
+            collection_config['config']['coords'] = {dim: 
collection_dimensions[dim]
+                                                     for dim in 
collection_dimensions if dim != 'variable'}
+
+            history_manager._push_dataset(collection.dataset_id, 
collection.store_type, json.dumps(collection_config))
+
     @staticmethod
     def _file_supported(file_path: str):
         ext = os.path.splitext(file_path)[-1]
         return ext in SUPPORTED_FILE_EXTENSIONS
 
-    def _get_history_manager(self, dataset_id: str) -> IngestionHistory:
+    def _get_history_manager(self, dataset_id: Optional[str]) -> 
IngestionHistory:
         if dataset_id not in self._history_manager_cache:
             self._history_manager_cache[dataset_id] = 
self._history_manager_builder.build(dataset_id=dataset_id)
         return self._history_manager_cache[dataset_id]
diff --git 
a/collection_manager/collection_manager/services/CollectionWatcher.py 
b/collection_manager/collection_manager/services/CollectionWatcher.py
index c28ff6a..94863fd 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -42,6 +42,7 @@ class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
                  granule_updated_callback: Callable[[str, Collection], 
Awaitable],
+                 dataset_added_callback: Callable[[Collection], None],
                  s3_bucket: Optional[str] = None,
                  collections_refresh_interval: float = 30):
         if not os.path.isabs(collections_path):
@@ -49,6 +50,7 @@ class CollectionWatcher:
 
         self._collections_path = collections_path
         self._granule_updated_callback = granule_updated_callback
+        self._dataset_added_callback = dataset_added_callback
         self._collections_refresh_interval = collections_refresh_interval
 
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
@@ -101,10 +103,13 @@ class CollectionWatcher:
             for collection_dict in collections_yaml['collections']:
                 try:
                     collection = Collection.from_dict(collection_dict)
-                    if collection.storage_type() != 
CollectionStorageType.REMOTE:
+                    if collection.storage_type() == CollectionStorageType.ZARR:
+                        self._dataset_added_callback(collection)
+                    elif collection.storage_type() != 
CollectionStorageType.REMOTE:
                         self._validate_collection(collection)
                         
self._collections_by_dir[collection.directory()].add(collection)
                 except MissingValueCollectionError as e:
+                    logger.exception(e)
                     logger.error(f"A collection is missing 
'{e.missing_value}'. Ignoring this collection for now.")
                 except RelativePathCollectionError as e:
                     logger.error(f"Relative paths are not allowed for the 
'path' property of a collection. "
diff --git 
a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
 
b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 6aff426..acf91d3 100644
--- 
a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ 
b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -15,7 +15,7 @@
 
 import hashlib
 import logging
-
+from datetime import datetime
 import pysolr
 import requests
 from collection_manager.services.history_manager.IngestionHistory import 
(IngestionHistory, IngestionHistoryBuilder)
@@ -82,6 +82,20 @@ class SolrIngestionHistory(IngestionHistory):
                 'latest_update_l': self._latest_ingested_file_update}])
             self._solr_datasets.commit()
 
+    @run_in_executor
+    def _push_dataset(self, dataset_id, type, config):
+        if self._solr_datasets:
+            if len(self._solr_datasets.search(q=f'id:{dataset_id}')) == 0:
+                self._solr_datasets.add([{
+                    'id': dataset_id,
+                    'dataset_s': dataset_id,
+                    'latest_update_l': int(datetime.now().timestamp()),
+                    'store_type_s': type,
+                    'config': config,
+                    'source_s': 'collection_config'
+                }])
+                self._solr_datasets.commit()
+
     def _get_latest_file_update(self):
         results = self._solr_datasets.search(q=f"id:{self._dataset_id}")
         if results:
@@ -141,12 +155,15 @@ class SolrIngestionHistory(IngestionHistory):
                 schema_endpoint = 
f"{self._url_prefix}/{self._dataset_collection_name}/schema"
                 self._add_field(schema_endpoint, "dataset_s", "string")
                 self._add_field(schema_endpoint, "latest_update_l", 
"TrieLongField")
+                self._add_field(schema_endpoint, "store_type_s", "string", 
True)
+                self._add_field(schema_endpoint, "source_s", "string", True)
+                self._add_field(schema_endpoint, "config", "text_general", 
True)
 
         except requests.exceptions.RequestException as e:
             logger.error(f"solr instance unreachable {self._solr_url}")
             raise e
 
-    def _add_field(self, schema_url, field_name, field_type):
+    def _add_field(self, schema_url, field_name, field_type, stored=False):
         """
         Helper to add a string field in a solr schema
         :param schema_url:
@@ -158,7 +175,7 @@ class SolrIngestionHistory(IngestionHistory):
             "add-field": {
                 "name": field_name,
                 "type": field_type,
-                "stored": False
+                "stored": stored
             }
         }
         return self._req_session.post(schema_url, 
data=str(add_field_payload).encode('utf-8'))

Reply via email to