This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/sdap-ingester.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7602e48  CM improvements (#93)
7602e48 is described below

commit 7602e48da72b2f351b42debed8674bf10d2c28c3
Author: Riley Kuttruff <[email protected]>
AuthorDate: Thu May 9 10:03:21 2024 -0700

    CM improvements (#93)
    
    * Suppress overly verbose loggers instead of all loggers to INFO
    
    * Collection manager improvements
    
    - Yield S3 Observer task as it's creating update events so that those 
events don't block until the iteration is complete
    - Improved logging
    - Log level settable by environment var LOG_LEVEL. Could be level name (ie, 
INFO, WARNING, &c) or numerical value
    
    * Changelog
    
    * Moved changelog entry to correct section
    
    ---------
    
    Co-authored-by: rileykk <[email protected]>
---
 CHANGELOG.md                                       | 16 ++++++++++--
 .../collection_manager/entities/Collection.py      |  4 ++-
 collection_manager/collection_manager/main.py      | 30 +++++++++++++++++++---
 .../services/CollectionProcessor.py                |  1 +
 .../services/CollectionWatcher.py                  |  2 ++
 .../collection_manager/services/S3Observer.py      | 23 ++++++++++++++++-
 collection_manager/requirements.txt                |  1 -
 7 files changed, 69 insertions(+), 8 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index ced6fca..0058cfe 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,19 @@ All notable changes to this project will be documented in this 
file.
 The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic 
Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [Unreleased]
+### Added
+### Changed
+- Improved Collection Manager logging
+  - Inhibited overly verbose loggers
+  - Logging verbosity configurable by environment
+- Improved concurrency for monitoring S3 collections
+### Deprecated
+### Removed
+### Fixed
+- SDAP-512: Fixed Granule Ingester not closing connections to 
Zookeeper/Solr/Cassandra, eventually exhausting network resources and requiring 
a restart
+### Security
+
 ## [1.2.0] - 2023-11-22
 ### Added
 - SDAP-477: Added preprocessor to properly shape incoming data
@@ -15,8 +28,7 @@ and this project adheres to [Semantic 
Versioning](https://semver.org/spec/v2.0.0
 ### Removed
 - SDAP-501: Updated dependencies to remove `chardet`
 ### Fixed
-- SDAP-488: Workaround to build issue on Apple Silicon (M1/M2). GI image build 
installs nexusproto through PyPI instead of building from source. A build arg 
`BUILD_NEXUSPROTO` was defined to allow building from source if desired/
-- SDAP-512: Fixed Granule Ingester not closing connections to 
Zookeeper/Solr/Cassandra, eventually exhausting network resources and requiring 
a restart
+- SDAP-488: Workaround to build issue on Apple Silicon (M1/M2). GI image build 
installs nexusproto through PyPI instead of building from source. A build arg 
`BUILD_NEXUSPROTO` was defined to allow building from source if desired
 ### Security
 
 ## [1.1.0] - 2023-04-26
diff --git a/collection_manager/collection_manager/entities/Collection.py 
b/collection_manager/collection_manager/entities/Collection.py
index 333f454..3554f34 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -76,7 +76,9 @@ class Collection:
         """
         Accepting either `variable` or `variables` from the configmap
         """
-        logger.debug(f'incoming properties dict: {properties}')
+        # Inhibiting this for now...
+        # logger.debug(f'Incoming properties dict: {properties}')
+
         try:
             date_to = datetime.fromisoformat(properties['to']) if 'to' in 
properties else None
             date_from = datetime.fromisoformat(properties['from']) if 'from' 
in properties else None
diff --git a/collection_manager/collection_manager/main.py 
b/collection_manager/collection_manager/main.py
index 8eead68..452fa25 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -24,8 +24,32 @@ from collection_manager.services.history_manager import (
     FileIngestionHistoryBuilder, SolrIngestionHistoryBuilder,
     md5sum_from_filepath)
 
-logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] 
[%(name)s::%(lineno)d] %(message)s")
-logging.getLogger("pika").setLevel(logging.WARNING)
+
+log_level = os.getenv('LOG_LEVEL', 'INFO')
+
+if log_level in ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']:
+    log_level = getattr(logging, log_level)
+else:
+    try:
+        log_level = int(log_level)
+    except:
+        log_level = logging.INFO
+
+logging.basicConfig(level=log_level, format="%(asctime)s [%(levelname)s] 
[%(name)s::%(lineno)d] %(message)s")
+
+SUPPRESS = [
+    'botocore',
+    's3transfer',
+    'urllib3',
+    'pika',
+    'boto3',
+    'aioboto3'
+]
+
+for logger_name in SUPPRESS:
+    logging.getLogger(logger_name).setLevel(logging.WARNING)
+
+
 logger = logging.getLogger(__name__)
 
 
@@ -112,4 +136,4 @@ async def main():
 
 
 if __name__ == "__main__":
-    asyncio.run(main())
+    asyncio.run(main(), debug=False)
diff --git 
a/collection_manager/collection_manager/services/CollectionProcessor.py 
b/collection_manager/collection_manager/services/CollectionProcessor.py
index 6c129c7..6b8f243 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -46,6 +46,7 @@ class CollectionProcessor:
         :return: None
         """
         if not self._file_supported(granule):
+            logger.warning(f'Tried to process unsupported file {granule}. 
Skipping.')
             return
 
         history_manager = self._get_history_manager(collection.dataset_id)
diff --git 
a/collection_manager/collection_manager/services/CollectionWatcher.py 
b/collection_manager/collection_manager/services/CollectionWatcher.py
index 063e823..c28ff6a 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -96,6 +96,8 @@ class CollectionWatcher:
             with open(self._collections_path, 'r') as f:
                 collections_yaml = yaml.load(f, Loader=yaml.FullLoader)
             self._collections_by_dir.clear()
+            logger.info('Refreshing collection config')
+
             for collection_dict in collections_yaml['collections']:
                 try:
                     collection = Collection.from_dict(collection_dict)
diff --git a/collection_manager/collection_manager/services/S3Observer.py 
b/collection_manager/collection_manager/services/S3Observer.py
index 866e43a..5ee84d6 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -20,9 +20,12 @@ import os
 import time
 from dataclasses import dataclass
 from typing import Set, Dict, Optional, Callable, Awaitable
+import logging
 
 import aioboto3
 
+logger = logging.getLogger(__name__)
+
 
 @dataclass
 class S3Event:
@@ -102,8 +105,14 @@ class S3Observer:
             watch_index = {**watch_index, **new_index}
         difference = set(new_cache.items()) - set(self._cache.items())
 
+        logger.info(f'S3 Poll completed; creating events for {len(difference)} 
found files')
+        logger.debug(f'(has_polled = {self._has_polled} || init_scan = 
{self._initial_scan}) = {self._has_polled or self._initial_scan}')
+
         if self._has_polled or self._initial_scan:
-            for (file, modified_date) in difference:
+            for i, (file, modified_date) in enumerate(difference):
+                if i % 100 == 0:
+                    logger.debug(f'Iterated over {i} items in diff')
+
                 watch = watch_index[file]
                 file_is_new = file not in self._cache
 
@@ -112,6 +121,10 @@ class S3Observer:
                 else:
                     
watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file, 
modified_time=modified_date))
 
+                await asyncio.sleep(0)
+
+        logger.info('All S3 events for this poll have been created')
+
         self._cache = new_cache
         self._has_polled = True
 
@@ -122,12 +135,20 @@ class S3Observer:
         async with aioboto3.resource("s3") as s3:
             bucket = await s3.Bucket(self._bucket)
 
+            n_keys = 0
+
             object_key = S3Observer._get_object_key(path)
+            logger.debug(f'Listing objects for bucket {self._bucket} under 
path {object_key}')
             async for file in bucket.objects.filter(Prefix=object_key):
+                n_keys += 1
                 new_cache[f"s3://{file.bucket_name}/{file.key}"] = await 
file.last_modified
+
         end = time.perf_counter()
         duration = end - start
 
+        logger.info(f'Finished listing objects for bucket {self._bucket} under 
path {object_key}: Found {n_keys} in '
+                     f'{duration} seconds')
+
         return new_cache
 
     def _get_object_key(full_path: str):
diff --git a/collection_manager/requirements.txt 
b/collection_manager/requirements.txt
index fbace6e..d768fec 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -14,7 +14,6 @@
 # limitations under the License.
 
 PyYAML==5.3.1
-pystache==0.5.4
 pysolr==3.9.0
 watchdog==0.10.2
 requests==2.31.0

Reply via email to