This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch s3-support in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit a7e3488093552aacced4a97d67ed717c458cb126 Author: Eamon Ford <[email protected]> AuthorDate: Thu Oct 29 16:32:57 2020 -0700 fixed scanning weirdness --- .../services/CollectionWatcher.py | 32 ++++++++++++---------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index f885b1c..1fb6abd 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -1,16 +1,15 @@ import asyncio from datetime import datetime -from collection_manager.entities.Collection import CollectionStorageType +from collection_manager.entities.Collection import CollectionStorageType, Collection from collection_manager.services.S3Observer import S3Event, S3Observer import logging import os import time from collections import defaultdict from glob import glob -from typing import Awaitable, Callable, Dict, Optional, Set +from typing import Awaitable, Callable, Dict, List, Optional, Set import yaml -from collection_manager.entities import Collection from collection_manager.entities.exceptions import (CollectionConfigFileNotFoundError, CollectionConfigParsingError, ConflictingPathCollectionError, @@ -58,7 +57,7 @@ class CollectionWatcher: wait_time=self._collections_refresh_interval, func=self._reload_and_reschedule) - if type(self._observer) == S3Observer: + if isinstance(self._observer, S3Observer): await self._observer.start() else: self._observer.start() @@ -117,18 +116,23 @@ class CollectionWatcher: self._load_collections() return self._collections() - old_collections + async def _call_callback_for_all_granules(self, collections: List[Collection]): + logger.info(f"Scanning files for {len(collections)} collections...") + start = time.perf_counter() + for collection in collections: + for granule_path in glob(collection.path, recursive=True): + modified_time = os.path.getmtime(granule_path) + await self._granule_updated_callback(granule_path, modified_time, collection) + logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.") + async def _reload_and_reschedule(self): try: updated_collections = self._get_updated_collections() if len(updated_collections) > 0: - logger.info(f"Scanning files for {len(updated_collections)} collections...") - start = time.perf_counter() - for collection in updated_collections: - files_owned = glob(collection.path, recursive=True) - for granule in files_owned: - await self._granule_updated_callback(granule, collection) - - logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.") + # For S3 collections, the S3Observer will report as new any files that haven't already been scanned. + # So we only need to rescan granules here if not using S3. + if not isinstance(self._observer, S3Observer): + await self._call_callback_for_all_granules(collections=updated_collections) self._unschedule_watches() self._schedule_watches() @@ -148,7 +152,7 @@ class CollectionWatcher: # Note: the Watchdog library does not schedule a new watch # if one is already scheduled for the same directory try: - if type(self._observer) == S3Observer: + if isinstance(self._observer, S3Observer): self._granule_watches.add(self._observer.schedule(granule_event_handler, directory)) else: self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True)) @@ -205,7 +209,7 @@ class _GranuleEventHandler(FileSystemEventHandler): if isinstance(event, S3Event): modified_time = event.modified_time else: - modified_time = os.path.getmtime(path) + modified_time = datetime.fromtimestamp(os.path.getmtime(path)) self._loop.create_task(self._callback(path, modified_time, collection)) except IsADirectoryError: return
