This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/s3-support by this push:
     new 100a9bb  fixed scanning weirdness
100a9bb is described below

commit 100a9bbebe4e4b7c21c81c7a53f289bedf63d701
Author: Eamon Ford <[email protected]>
AuthorDate: Thu Oct 29 16:32:57 2020 -0700

    fixed scanning weirdness
---
 .../services/CollectionWatcher.py                  | 32 ++++++++++++----------
 1 file changed, 18 insertions(+), 14 deletions(-)

diff --git 
a/collection_manager/collection_manager/services/CollectionWatcher.py 
b/collection_manager/collection_manager/services/CollectionWatcher.py
index f885b1c..1fb6abd 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,16 +1,15 @@
 import asyncio
 from datetime import datetime
-from collection_manager.entities.Collection import CollectionStorageType
+from collection_manager.entities.Collection import CollectionStorageType, 
Collection
 from collection_manager.services.S3Observer import S3Event, S3Observer
 import logging
 import os
 import time
 from collections import defaultdict
 from glob import glob
-from typing import Awaitable, Callable, Dict, Optional, Set
+from typing import Awaitable, Callable, Dict, List, Optional, Set
 
 import yaml
-from collection_manager.entities import Collection
 from collection_manager.entities.exceptions import 
(CollectionConfigFileNotFoundError,
                                                     
CollectionConfigParsingError,
                                                     
ConflictingPathCollectionError,
@@ -58,7 +57,7 @@ class CollectionWatcher:
                                      
wait_time=self._collections_refresh_interval,
                                      func=self._reload_and_reschedule)
 
-        if type(self._observer) == S3Observer:
+        if isinstance(self._observer, S3Observer):
             await self._observer.start()
         else:
             self._observer.start()
@@ -117,18 +116,23 @@ class CollectionWatcher:
         self._load_collections()
         return self._collections() - old_collections
 
+    async def _call_callback_for_all_granules(self, collections: 
List[Collection]):
+        logger.info(f"Scanning files for {len(collections)} collections...")
+        start = time.perf_counter()
+        for collection in collections:
+            for granule_path in glob(collection.path, recursive=True):
+                modified_time = os.path.getmtime(granule_path)
+                await self._granule_updated_callback(granule_path, 
modified_time, collection)
+        logger.info(f"Finished scanning files in {time.perf_counter() - start} 
seconds.")
+
     async def _reload_and_reschedule(self):
         try:
             updated_collections = self._get_updated_collections()
             if len(updated_collections) > 0:
-                logger.info(f"Scanning files for {len(updated_collections)} 
collections...")
-                start = time.perf_counter()
-                for collection in updated_collections:
-                    files_owned = glob(collection.path, recursive=True)
-                    for granule in files_owned:
-                        await self._granule_updated_callback(granule, 
collection)
-
-                logger.info(f"Finished scanning files in {time.perf_counter() 
- start} seconds.")
+                # For S3 collections, the S3Observer will report as new any 
files that haven't already been scanned.
+                # So we only need to rescan granules here if not using S3.
+                if not isinstance(self._observer, S3Observer):
+                    await 
self._call_callback_for_all_granules(collections=updated_collections)
 
                 self._unschedule_watches()
                 self._schedule_watches()
@@ -148,7 +152,7 @@ class CollectionWatcher:
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
             try:
-                if type(self._observer) == S3Observer:
+                if isinstance(self._observer, S3Observer):
                     
self._granule_watches.add(self._observer.schedule(granule_event_handler, 
directory))
                 else:
                     
self._granule_watches.add(self._observer.schedule(granule_event_handler, 
directory, recursive=True))
@@ -205,7 +209,7 @@ class _GranuleEventHandler(FileSystemEventHandler):
                     if isinstance(event, S3Event):
                         modified_time = event.modified_time
                     else:
-                        modified_time = os.path.getmtime(path)
+                        modified_time = 
datetime.fromtimestamp(os.path.getmtime(path))
                     self._loop.create_task(self._callback(path, modified_time, 
collection))
             except IsADirectoryError:
                 return

Reply via email to