This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch s3-support in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 883dddf75350a08953d4fb16715b4e404ece6d8a Author: Eamon Ford <[email protected]> AuthorDate: Thu Oct 22 16:38:08 2020 -0700 Properly scans S3, still needs S3 signature fun --- .../services/CollectionProcessor.py | 7 ++--- .../services/CollectionWatcher.py | 14 +++++++--- .../collection_manager/services/S3Observer.py | 18 ++++++++----- .../services/history_manager/IngestionHistory.py | 31 ++++++++++++++-------- .../tests/services/test_S3Observer.py | 8 ++++++ 5 files changed, 54 insertions(+), 24 deletions(-) diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py index ab8ce95..89d413b 100644 --- a/collection_manager/collection_manager/services/CollectionProcessor.py +++ b/collection_manager/collection_manager/services/CollectionProcessor.py @@ -2,6 +2,7 @@ import logging import os.path from glob import glob from typing import Dict +from datetime import datetime import yaml from collection_manager.entities import Collection @@ -23,7 +24,7 @@ class CollectionProcessor: self._history_manager_builder = history_manager_builder self._history_manager_cache: Dict[str, IngestionHistory] = {} - async def process_granule(self, granule: str, collection: Collection): + async def process_granule(self, granule: str, modified_time: datetime, collection: Collection): """ Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it. :param granule: A path to a granule file @@ -34,7 +35,7 @@ class CollectionProcessor: return history_manager = self._get_history_manager(collection.dataset_id) - granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to) + granule_status = await history_manager.get_granule_status(granule, modified_time, collection.date_from, collection.date_to) if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING: logger.info(f"New granule '{granule}' detected for forward-processing ingestion " @@ -54,7 +55,7 @@ class CollectionProcessor: dataset_config = self._generate_ingestion_message(granule, collection) await self._publisher.publish_message(body=dataset_config, priority=use_priority) - await history_manager.push(granule) + await history_manager.push(granule, modified_time) @staticmethod def _file_supported(file_path: str): diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 87b1ac3..f885b1c 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -1,6 +1,7 @@ import asyncio +from datetime import datetime from collection_manager.entities.Collection import CollectionStorageType -from collection_manager.services.S3Observer import S3Observer +from collection_manager.services.S3Observer import S3Event, S3Observer import logging import os import time @@ -39,7 +40,7 @@ class CollectionWatcher: self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set) if s3: - self._observer = S3Observer('nexus-ingest') + self._observer = S3Observer('nexus-ingest', initial_scan=True) else: self._observer = Observer() @@ -197,9 +198,14 @@ class _GranuleEventHandler(FileSystemEventHandler): self._handle_event(event) def _handle_event(self, event): + path = event.src_path for collection in self._collections_for_dir: try: - if collection.owns_file(event.src_path): - self._loop.create_task(self._callback(event.src_path, collection)) + if collection.owns_file(path): + if isinstance(event, S3Event): + modified_time = event.modified_time + else: + modified_time = os.path.getmtime(path) + self._loop.create_task(self._callback(path, modified_time, collection)) except IsADirectoryError: return diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py index 376a907..d204890 100644 --- a/collection_manager/collection_manager/services/S3Observer.py +++ b/collection_manager/collection_manager/services/S3Observer.py @@ -1,4 +1,5 @@ import asyncio +from urllib.parse import urlparse import datetime import os import time @@ -14,6 +15,7 @@ os.environ['AWS_DEFAULT_REGION'] = "us-west-2" @dataclass class S3Event: src_path: str + modified_time: datetime.datetime class S3FileModifiedEvent(S3Event): @@ -48,7 +50,7 @@ class S3Observer: def unschedule(self, watch: S3Watch): self._watches.remove(watch) - def schedule(self,event_handler, path: str): + def schedule(self, event_handler, path: str): watch = S3Watch(path=path, event_handler=event_handler) self._watches.add(watch) return watch @@ -90,9 +92,9 @@ class S3Observer: file_is_new = file not in self._cache if file_is_new: - watch.event_handler.on_created(S3FileCreatedEvent(src_path=file)) + watch.event_handler.on_created(S3FileCreatedEvent(src_path=file, modified_time=modified_date)) else: - watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file)) + watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file, modified_time=modified_date)) self._cache = new_cache self._has_polled = True @@ -104,9 +106,9 @@ class S3Observer: async with aioboto3.resource("s3") as s3: bucket = await s3.Bucket(self._bucket) - # we need the key without the bucket name - async for file in bucket.objects.filter(Prefix=path): - new_cache[file.key] = await file.last_modified + object_key = S3Observer._get_object_key(path) + async for file in bucket.objects.filter(Prefix=object_key): + new_cache[f"s3://{file.bucket_name}/{file.key}"] = await file.last_modified end = time.perf_counter() duration = end - start @@ -114,6 +116,10 @@ class S3Observer: return new_cache + def _get_object_key(full_path: str): + key = urlparse(full_path).path.strip("/") + return key + async def test(): observer = S3Observer(bucket="nexus-ingest", initial_scan=False) diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py index 231d179..ea50ffb 100644 --- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py @@ -1,4 +1,5 @@ import hashlib +from urllib.parse import urlparse import logging import os from abc import ABC, abstractmethod @@ -6,6 +7,8 @@ from datetime import datetime from enum import Enum from typing import Optional +from botocore.compat import filter_ssl_warnings + logger = logging.getLogger(__name__) BLOCK_SIZE = 65536 @@ -37,27 +40,26 @@ class IngestionHistory(ABC): _signature_fun = None _latest_ingested_file_update = None - async def push(self, file_path: str): + async def push(self, file_path: str, modified_time: datetime): """ Record a file as having been ingested. :param file_path: The full path to the file to record. :return: None """ - file_path = file_path.strip() - file_name = os.path.basename(file_path) + file_name = IngestionHistory._get_standardized_path(file_path) signature = self._signature_fun(file_path) await self._push_record(file_name, signature) - file_modified_date = os.path.getmtime(file_path) if not self._latest_ingested_file_update: - self._latest_ingested_file_update = file_modified_date + self._latest_ingested_file_update = modified_time else: - self._latest_ingested_file_update = max(self._latest_ingested_file_update, file_modified_date) + self._latest_ingested_file_update = max(self._latest_ingested_file_update, modified_time) await self._save_latest_timestamp() async def get_granule_status(self, file_path: str, + modified_time: datetime, date_from: datetime = None, date_to: datetime = None) -> GranuleStatus: """ @@ -74,14 +76,22 @@ class IngestionHistory(ABC): should fall in order to be "desired". :return: A GranuleStatus enum. """ - file_modified_date = os.path.getmtime(file_path) - if self._in_time_range(file_modified_date, start_date=self._latest_ingested_mtime()): + if self._in_time_range(modified_time, start_date=self._latest_ingested_mtime()): return GranuleStatus.DESIRED_FORWARD_PROCESSING - elif self._in_time_range(file_modified_date, date_from, date_to) and not await self._already_ingested(file_path): + elif self._in_time_range(modified_time, date_from, date_to) and not await self._already_ingested(file_path): return GranuleStatus.DESIRED_HISTORICAL else: return GranuleStatus.UNDESIRED + def _get_standardized_path(file_path: str): + file_path = file_path.strip() + # TODO: Why do we need to record the basename of the path, instead of just the full path? + # The only reason this is here right now is for backwards compatibility to avoid triggering a full reingestion. + if urlparse(file_path).scheme == 's3': + return urlparse(file_path).path.strip("/") + else: + return os.path.basename(file_path) + def _latest_ingested_mtime(self) -> Optional[datetime]: """ Return the modified time of the most recently modified file that was ingested. @@ -98,8 +108,7 @@ class IngestionHistory(ABC): :param file_path: The full path of a file to search for in the history. :return: A boolean indicating whether this file has already been ingested or not """ - file_path = file_path.strip() - file_name = os.path.basename(file_path) + file_name = IngestionHistory._get_standardized_path(file_path) signature = self._signature_fun(file_path) return signature == await self._get_signature(file_name) diff --git a/collection_manager/tests/services/test_S3Observer.py b/collection_manager/tests/services/test_S3Observer.py new file mode 100644 index 0000000..3fa49e0 --- /dev/null +++ b/collection_manager/tests/services/test_S3Observer.py @@ -0,0 +1,8 @@ +from collection_manager.services import S3Observer +import unittest + + +class TestS3Observer(unittest.TestCase): + + def test_get_object_key(self): + self.assertEqual('test_dir/object.nc', S3Observer._get_object_key('s3://test-bucket/test_dir/object.nc'))
