This is an automated email from the ASF dual-hosted git repository.
rkk pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/sdap-ingester.git
The following commit(s) were added to refs/heads/develop by this push:
new 7602e48 CM improvements (#93)
7602e48 is described below
commit 7602e48da72b2f351b42debed8674bf10d2c28c3
Author: Riley Kuttruff <[email protected]>
AuthorDate: Thu May 9 10:03:21 2024 -0700
CM improvements (#93)
* Suppress overly verbose loggers instead of all loggers to INFO
* Collection manager improvements
- Yield S3 Observer task as it's creating update events so that those
events don't block until the iteration is complete
- Improved logging
- Log level settable by environment var LOG_LEVEL. Could be level name (ie,
INFO, WARNING, &c) or numerical value
* Changelog
* Moved changelog entry to correct section
---------
Co-authored-by: rileykk <[email protected]>
---
CHANGELOG.md | 16 ++++++++++--
.../collection_manager/entities/Collection.py | 4 ++-
collection_manager/collection_manager/main.py | 30 +++++++++++++++++++---
.../services/CollectionProcessor.py | 1 +
.../services/CollectionWatcher.py | 2 ++
.../collection_manager/services/S3Observer.py | 23 ++++++++++++++++-
collection_manager/requirements.txt | 1 -
7 files changed, 69 insertions(+), 8 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ced6fca..0058cfe 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,19 @@ All notable changes to this project will be documented in this
file.
The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0.html).
+## [Unreleased]
+### Added
+### Changed
+- Improved Collection Manager logging
+ - Inhibited overly verbose loggers
+ - Logging verbosity configurable by environment
+- Improved concurrency for monitoring S3 collections
+### Deprecated
+### Removed
+### Fixed
+- SDAP-512: Fixed Granule Ingester not closing connections to
Zookeeper/Solr/Cassandra, eventually exhausting network resources and requiring
a restart
+### Security
+
## [1.2.0] - 2023-11-22
### Added
- SDAP-477: Added preprocessor to properly shape incoming data
@@ -15,8 +28,7 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
### Removed
- SDAP-501: Updated dependencies to remove `chardet`
### Fixed
-- SDAP-488: Workaround to build issue on Apple Silicon (M1/M2). GI image build
installs nexusproto through PyPI instead of building from source. A build arg
`BUILD_NEXUSPROTO` was defined to allow building from source if desired/
-- SDAP-512: Fixed Granule Ingester not closing connections to
Zookeeper/Solr/Cassandra, eventually exhausting network resources and requiring
a restart
+- SDAP-488: Workaround to build issue on Apple Silicon (M1/M2). GI image build
installs nexusproto through PyPI instead of building from source. A build arg
`BUILD_NEXUSPROTO` was defined to allow building from source if desired
### Security
## [1.1.0] - 2023-04-26
diff --git a/collection_manager/collection_manager/entities/Collection.py
b/collection_manager/collection_manager/entities/Collection.py
index 333f454..3554f34 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -76,7 +76,9 @@ class Collection:
"""
Accepting either `variable` or `variables` from the configmap
"""
- logger.debug(f'incoming properties dict: {properties}')
+ # Inhibiting this for now...
+ # logger.debug(f'Incoming properties dict: {properties}')
+
try:
date_to = datetime.fromisoformat(properties['to']) if 'to' in
properties else None
date_from = datetime.fromisoformat(properties['from']) if 'from'
in properties else None
diff --git a/collection_manager/collection_manager/main.py
b/collection_manager/collection_manager/main.py
index 8eead68..452fa25 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -24,8 +24,32 @@ from collection_manager.services.history_manager import (
FileIngestionHistoryBuilder, SolrIngestionHistoryBuilder,
md5sum_from_filepath)
-logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s]
[%(name)s::%(lineno)d] %(message)s")
-logging.getLogger("pika").setLevel(logging.WARNING)
+
+log_level = os.getenv('LOG_LEVEL', 'INFO')
+
+if log_level in ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']:
+ log_level = getattr(logging, log_level)
+else:
+ try:
+ log_level = int(log_level)
+ except:
+ log_level = logging.INFO
+
+logging.basicConfig(level=log_level, format="%(asctime)s [%(levelname)s]
[%(name)s::%(lineno)d] %(message)s")
+
+SUPPRESS = [
+ 'botocore',
+ 's3transfer',
+ 'urllib3',
+ 'pika',
+ 'boto3',
+ 'aioboto3'
+]
+
+for logger_name in SUPPRESS:
+ logging.getLogger(logger_name).setLevel(logging.WARNING)
+
+
logger = logging.getLogger(__name__)
@@ -112,4 +136,4 @@ async def main():
if __name__ == "__main__":
- asyncio.run(main())
+ asyncio.run(main(), debug=False)
diff --git
a/collection_manager/collection_manager/services/CollectionProcessor.py
b/collection_manager/collection_manager/services/CollectionProcessor.py
index 6c129c7..6b8f243 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -46,6 +46,7 @@ class CollectionProcessor:
:return: None
"""
if not self._file_supported(granule):
+ logger.warning(f'Tried to process unsupported file {granule}.
Skipping.')
return
history_manager = self._get_history_manager(collection.dataset_id)
diff --git
a/collection_manager/collection_manager/services/CollectionWatcher.py
b/collection_manager/collection_manager/services/CollectionWatcher.py
index 063e823..c28ff6a 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -96,6 +96,8 @@ class CollectionWatcher:
with open(self._collections_path, 'r') as f:
collections_yaml = yaml.load(f, Loader=yaml.FullLoader)
self._collections_by_dir.clear()
+ logger.info('Refreshing collection config')
+
for collection_dict in collections_yaml['collections']:
try:
collection = Collection.from_dict(collection_dict)
diff --git a/collection_manager/collection_manager/services/S3Observer.py
b/collection_manager/collection_manager/services/S3Observer.py
index 866e43a..5ee84d6 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -20,9 +20,12 @@ import os
import time
from dataclasses import dataclass
from typing import Set, Dict, Optional, Callable, Awaitable
+import logging
import aioboto3
+logger = logging.getLogger(__name__)
+
@dataclass
class S3Event:
@@ -102,8 +105,14 @@ class S3Observer:
watch_index = {**watch_index, **new_index}
difference = set(new_cache.items()) - set(self._cache.items())
+ logger.info(f'S3 Poll completed; creating events for {len(difference)}
found files')
+ logger.debug(f'(has_polled = {self._has_polled} || init_scan =
{self._initial_scan}) = {self._has_polled or self._initial_scan}')
+
if self._has_polled or self._initial_scan:
- for (file, modified_date) in difference:
+ for i, (file, modified_date) in enumerate(difference):
+ if i % 100 == 0:
+ logger.debug(f'Iterated over {i} items in diff')
+
watch = watch_index[file]
file_is_new = file not in self._cache
@@ -112,6 +121,10 @@ class S3Observer:
else:
watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file,
modified_time=modified_date))
+ await asyncio.sleep(0)
+
+ logger.info('All S3 events for this poll have been created')
+
self._cache = new_cache
self._has_polled = True
@@ -122,12 +135,20 @@ class S3Observer:
async with aioboto3.resource("s3") as s3:
bucket = await s3.Bucket(self._bucket)
+ n_keys = 0
+
object_key = S3Observer._get_object_key(path)
+ logger.debug(f'Listing objects for bucket {self._bucket} under
path {object_key}')
async for file in bucket.objects.filter(Prefix=object_key):
+ n_keys += 1
new_cache[f"s3://{file.bucket_name}/{file.key}"] = await
file.last_modified
+
end = time.perf_counter()
duration = end - start
+ logger.info(f'Finished listing objects for bucket {self._bucket} under
path {object_key}: Found {n_keys} in '
+ f'{duration} seconds')
+
return new_cache
def _get_object_key(full_path: str):
diff --git a/collection_manager/requirements.txt
b/collection_manager/requirements.txt
index fbace6e..d768fec 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -14,7 +14,6 @@
# limitations under the License.
PyYAML==5.3.1
-pystache==0.5.4
pysolr==3.9.0
watchdog==0.10.2
requests==2.31.0