This is an automated email from the ASF dual-hosted git repository.

tloubrieu pushed a commit to branch ascending_latitudes
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 31bff5e6c524c213ae33fe1af39f18f4da2969cc
Author: Eamon Ford <[email protected]>
AuthorDate: Thu Nov 12 14:59:34 2020 -0800

    SDAP-288: S3 ingestion support (#24)
---
 .../collection_manager/entities/Collection.py      |  33 +++--
 .../collection_manager/entities/__init__.py        |   1 +
 collection_manager/collection_manager/main.py      |  22 +++-
 .../services/CollectionProcessor.py                |  28 ++--
 .../services/CollectionWatcher.py                  |  90 ++++++++-----
 .../collection_manager/services/S3Observer.py      | 145 +++++++++++++++++++++
 .../collection_manager/services/__init__.py        |   1 +
 .../history_manager/FileIngestionHistory.py        |   3 +-
 .../services/history_manager/IngestionHistory.py   |  79 ++++++-----
 .../history_manager/SolrIngestionHistory.py        |   9 +-
 collection_manager/docker/Dockerfile               |   4 +-
 collection_manager/docker/entrypoint.sh            |   5 +-
 collection_manager/requirements.txt                |   6 +-
 collection_manager/setup.py                        |   2 +-
 .../history_manager/test_FileIngestionHistory.py   |   6 +-
 .../tests/services/test_S3Observer.py              |   8 ++
 granule_ingester/conda-requirements.txt            |   1 -
 granule_ingester/docker/Dockerfile                 |   2 +
 granule_ingester/requirements.txt                  |   5 +-
 19 files changed, 334 insertions(+), 116 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py 
b/collection_manager/collection_manager/entities/Collection.py
index 031a3a9..7a45b66 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -1,13 +1,20 @@
 import os
+from urllib.parse import urlparse
 from dataclasses import dataclass
 from datetime import datetime
 from fnmatch import fnmatch
 from glob import glob
 from typing import List, Optional
+from enum import Enum
 
 from collection_manager.entities.exceptions import MissingValueCollectionError
 
 
+class CollectionStorageType(Enum):
+    LOCAL = 1
+    S3 = 2
+
+
 @dataclass(frozen=True)
 class Collection:
     dataset_id: str
@@ -39,20 +46,28 @@ class Collection:
         except KeyError as e:
             raise MissingValueCollectionError(missing_value=e.args[0])
 
+    def storage_type(self):
+        if urlparse(self.path).scheme == 's3':
+            return CollectionStorageType.S3
+        else:
+            return CollectionStorageType.LOCAL
+
     def directory(self):
-        if os.path.isdir(self.path):
+        if urlparse(self.path).scheme == 's3':
+            return self.path
+        elif os.path.isdir(self.path):
             return self.path
         else:
             return os.path.dirname(self.path)
 
     def owns_file(self, file_path: str) -> bool:
-        if os.path.isdir(file_path):
-            raise IsADirectoryError()
-
-        if os.path.isdir(self.path):
-            return os.path.dirname(file_path) == self.path
+        if urlparse(file_path).scheme == 's3':
+            return file_path.find(self.path) == 0
         else:
-            return fnmatch(file_path, self.path)
+            if os.path.isdir(file_path):
+                raise IsADirectoryError()
 
-    def files_owned(self) -> List[str]:
-        return glob(self.path, recursive=True)
+            if os.path.isdir(self.path):
+                return os.path.dirname(file_path) == self.path
+            else:
+                return fnmatch(file_path, self.path)
diff --git a/collection_manager/collection_manager/entities/__init__.py 
b/collection_manager/collection_manager/entities/__init__.py
index 165341b..b9c7a25 100644
--- a/collection_manager/collection_manager/entities/__init__.py
+++ b/collection_manager/collection_manager/entities/__init__.py
@@ -1 +1,2 @@
 from .Collection import Collection
+from .Collection import CollectionStorageType
\ No newline at end of file
diff --git a/collection_manager/collection_manager/main.py 
b/collection_manager/collection_manager/main.py
index cbe22f9..b80ae7c 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -3,8 +3,11 @@ import asyncio
 import logging
 import os
 
-from collection_manager.services import CollectionProcessor, 
CollectionWatcher, MessagePublisher
-from collection_manager.services.history_manager import 
SolrIngestionHistoryBuilder, FileIngestionHistoryBuilder
+from collection_manager.services import (CollectionProcessor,
+                                         CollectionWatcher, MessagePublisher)
+from collection_manager.services.history_manager import (
+    FileIngestionHistoryBuilder, SolrIngestionHistoryBuilder,
+    md5sum_from_filepath)
 
 logging.basicConfig(level=logging.INFO)
 logging.getLogger("pika").setLevel(logging.WARNING)
@@ -51,6 +54,9 @@ def get_args() -> argparse.Namespace:
                         default='30',
                         metavar="INTERVAL",
                         help='Number of seconds after which to reload the 
collections config file. (Default: 30)')
+    parser.add_argument('--s3-bucket',
+                        metavar='S3-BUCKET',
+                        help='Optional name of an AWS S3 bucket where granules 
are stored. If this option is set, then all collections to be scanned must have 
their granules on S3, not the local filesystem.')
 
     return parser.parse_args()
 
@@ -59,10 +65,14 @@ async def main():
     try:
         options = get_args()
 
+        signature_fun = None if options.s3_bucket else md5sum_from_filepath
+
         if options.history_path:
-            history_manager_builder = 
FileIngestionHistoryBuilder(history_path=options.history_path)
+            history_manager_builder = 
FileIngestionHistoryBuilder(history_path=options.history_path,
+                                                                  
signature_fun=signature_fun)
         else:
-            history_manager_builder = 
SolrIngestionHistoryBuilder(solr_url=options.history_url)
+            history_manager_builder = 
SolrIngestionHistoryBuilder(solr_url=options.history_url,
+                                                                  
signature_fun=signature_fun)
         async with MessagePublisher(host=options.rabbitmq_host,
                                     username=options.rabbitmq_username,
                                     password=options.rabbitmq_password,
@@ -70,9 +80,9 @@ async def main():
             collection_processor = 
CollectionProcessor(message_publisher=publisher,
                                                        
history_manager_builder=history_manager_builder)
             collection_watcher = 
CollectionWatcher(collections_path=options.collections_path,
-                                                   
collection_updated_callback=collection_processor.process_collection,
                                                    
granule_updated_callback=collection_processor.process_granule,
-                                                   
collections_refresh_interval=int(options.refresh))
+                                                   
collections_refresh_interval=int(options.refresh),
+                                                   s3_bucket=options.s3_bucket)
 
             await collection_watcher.start_watching()
             while True:
diff --git 
a/collection_manager/collection_manager/services/CollectionProcessor.py 
b/collection_manager/collection_manager/services/CollectionProcessor.py
index 975f50c..96c461e 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -1,12 +1,16 @@
 import logging
 import os.path
+from glob import glob
 from typing import Dict
-import yaml
+from datetime import datetime
 
+import yaml
 from collection_manager.entities import Collection
 from collection_manager.services import MessagePublisher
-from collection_manager.services.history_manager import IngestionHistory, 
GranuleStatus
-from collection_manager.services.history_manager.IngestionHistory import 
IngestionHistoryBuilder
+from collection_manager.services.history_manager import (GranuleStatus,
+                                                         IngestionHistory)
+from collection_manager.services.history_manager.IngestionHistory import \
+    IngestionHistoryBuilder
 
 logger = logging.getLogger(__name__)
 
@@ -20,16 +24,7 @@ class CollectionProcessor:
         self._history_manager_builder = history_manager_builder
         self._history_manager_cache: Dict[str, IngestionHistory] = {}
 
-    async def process_collection(self, collection: Collection):
-        """
-        Given a Collection, detect new granules that need to be ingested and 
publish RabbitMQ messages for each.
-        :param collection: A Collection definition
-        :return: None
-        """
-        for granule in collection.files_owned():
-            await self.process_granule(granule, collection)
-
-    async def process_granule(self, granule: str, collection: Collection):
+    async def process_granule(self, granule: str, modified_time: int, 
collection: Collection):
         """
         Determine whether a granule needs to be ingested, and if so publish a 
RabbitMQ message for it.
         :param granule: A path to a granule file
@@ -40,7 +35,10 @@ class CollectionProcessor:
             return
 
         history_manager = self._get_history_manager(collection.dataset_id)
-        granule_status = await history_manager.get_granule_status(granule, 
collection.date_from, collection.date_to)
+        granule_status = await history_manager.get_granule_status(granule,
+                                                                  
modified_time,
+                                                                  
collection.date_from,
+                                                                  
collection.date_to)
 
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
             logger.info(f"New granule '{granule}' detected for 
forward-processing ingestion "
@@ -60,7 +58,7 @@ class CollectionProcessor:
 
         dataset_config = self._generate_ingestion_message(granule, collection)
         await self._publisher.publish_message(body=dataset_config, 
priority=use_priority)
-        await history_manager.push(granule)
+        await history_manager.push(granule, modified_time)
 
     @staticmethod
     def _file_supported(file_path: str):
diff --git 
a/collection_manager/collection_manager/services/CollectionWatcher.py 
b/collection_manager/collection_manager/services/CollectionWatcher.py
index 1c7c1be..b1aaf4e 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,17 +1,22 @@
 import asyncio
+from datetime import datetime
+from collection_manager.entities.Collection import CollectionStorageType, 
Collection
+from collection_manager.services.S3Observer import S3Event, S3Observer
 import logging
 import os
 import time
 from collections import defaultdict
-from typing import Awaitable, Callable, Dict, Optional, Set
+from glob import glob
+from typing import Awaitable, Callable, Dict, List, Optional, Set
 
 import yaml
-from collection_manager.entities import Collection
-from collection_manager.entities.exceptions import (
-    CollectionConfigFileNotFoundError, CollectionConfigParsingError,
-    ConflictingPathCollectionError, MissingValueCollectionError,
-    RelativePathCollectionError, RelativePathError)
-from watchdog.events import FileSystemEventHandler
+from collection_manager.entities.exceptions import 
(CollectionConfigFileNotFoundError,
+                                                    
CollectionConfigParsingError,
+                                                    
ConflictingPathCollectionError,
+                                                    
MissingValueCollectionError,
+                                                    
RelativePathCollectionError,
+                                                    RelativePathError)
+from watchdog.events import FileCreatedEvent, FileModifiedEvent, 
FileSystemEventHandler
 from watchdog.observers.polling import PollingObserver as Observer
 
 logger = logging.getLogger(__name__)
@@ -21,19 +26,18 @@ logger.setLevel(logging.DEBUG)
 class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
-                 collection_updated_callback: Callable[[Collection], 
Awaitable],
                  granule_updated_callback: Callable[[str, Collection], 
Awaitable],
+                 s3_bucket: Optional[str] = None,
                  collections_refresh_interval: float = 30):
         if not os.path.isabs(collections_path):
             raise RelativePathError("Collections config  path must be an 
absolute path.")
 
         self._collections_path = collections_path
-        self._collection_updated_callback = collection_updated_callback
         self._granule_updated_callback = granule_updated_callback
         self._collections_refresh_interval = collections_refresh_interval
 
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
-        self._observer = Observer()
+        self._observer = S3Observer(s3_bucket, initial_scan=True) if s3_bucket 
else Observer()
 
         self._granule_watches = set()
 
@@ -48,7 +52,11 @@ class CollectionWatcher:
         await self._run_periodically(loop=loop,
                                      
wait_time=self._collections_refresh_interval,
                                      func=self._reload_and_reschedule)
-        self._observer.start()
+
+        if isinstance(self._observer, S3Observer):
+            await self._observer.start()
+        else:
+            self._observer.start()
 
     def _collections(self) -> Set[Collection]:
         """
@@ -58,11 +66,15 @@ class CollectionWatcher:
         return {collection for collections in 
self._collections_by_dir.values() for collection in collections}
 
     def _validate_collection(self, collection: Collection):
-        directory = collection.directory()
-        if not os.path.isabs(directory):
-            raise RelativePathCollectionError(collection=collection)
-        if directory == os.path.dirname(self._collections_path):
-            raise ConflictingPathCollectionError(collection=collection)
+        if collection.storage_type() == CollectionStorageType.S3:
+            # do some S3 path validation here
+            return
+        else:
+            directory = collection.directory()
+            if not os.path.isabs(directory):
+                raise RelativePathCollectionError(collection=collection)
+            if directory == os.path.dirname(self._collections_path):
+                raise ConflictingPathCollectionError(collection=collection)
 
     def _load_collections(self):
         try:
@@ -100,15 +112,23 @@ class CollectionWatcher:
         self._load_collections()
         return self._collections() - old_collections
 
+    async def _call_callback_for_all_granules(self, collections: 
List[Collection]):
+        logger.info(f"Scanning files for {len(collections)} collections...")
+        start = time.perf_counter()
+        for collection in collections:
+            for granule_path in glob(collection.path, recursive=True):
+                modified_time = int(os.path.getmtime(granule_path))
+                await self._granule_updated_callback(granule_path, 
modified_time, collection)
+        logger.info(f"Finished scanning files in {time.perf_counter() - start} 
seconds.")
+
     async def _reload_and_reschedule(self):
         try:
             updated_collections = self._get_updated_collections()
             if len(updated_collections) > 0:
-                logger.info(f"Scanning files for {len(updated_collections)} 
collections...")
-                start = time.perf_counter()
-                for collection in updated_collections:
-                    await self._collection_updated_callback(collection)
-                logger.info(f"Finished scanning files in {time.perf_counter() 
- start} seconds.")
+                # For S3 collections, the S3Observer will report as new any 
files that haven't already been scanned.
+                # So we only need to rescan granules here if not using S3.
+                if not isinstance(self._observer, S3Observer):
+                    await 
self._call_callback_for_all_granules(collections=updated_collections)
 
                 self._unschedule_watches()
                 self._schedule_watches()
@@ -128,7 +148,10 @@ class CollectionWatcher:
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
             try:
-                
self._granule_watches.add(self._observer.schedule(granule_event_handler, 
directory, recursive=True))
+                if isinstance(self._observer, S3Observer):
+                    
self._granule_watches.add(self._observer.schedule(granule_event_handler, 
directory))
+                else:
+                    
self._granule_watches.add(self._observer.schedule(granule_event_handler, 
directory, recursive=True))
             except (FileNotFoundError, NotADirectoryError):
                 bad_collection_names = ' and '.join([col.dataset_id for col in 
collections])
                 logger.error(f"Granule directory {directory} does not exist. 
Ignoring {bad_collection_names}.")
@@ -168,18 +191,21 @@ class _GranuleEventHandler(FileSystemEventHandler):
 
     def on_created(self, event):
         super().on_created(event)
-        for collection in self._collections_for_dir:
-            try:
-                if collection.owns_file(event.src_path):
-                    self._loop.create_task(self._callback(event.src_path, 
collection))
-            except IsADirectoryError:
-                pass
+        self._handle_event(event)
 
     def on_modified(self, event):
         super().on_modified(event)
-        if os.path.isdir(event.src_path):
-            return
+        self._handle_event(event)
 
+    def _handle_event(self, event):
+        path = event.src_path
         for collection in self._collections_for_dir:
-            if collection.owns_file(event.src_path):
-                self._loop.create_task(self._callback(event.src_path, 
collection))
+            try:
+                if collection.owns_file(path):
+                    if isinstance(event, S3Event):
+                        modified_time = int(event.modified_time.timestamp())
+                    else:
+                        modified_time = int(os.path.getmtime(path))
+                    self._loop.create_task(self._callback(path, modified_time, 
collection))
+            except IsADirectoryError:
+                return
diff --git a/collection_manager/collection_manager/services/S3Observer.py 
b/collection_manager/collection_manager/services/S3Observer.py
new file mode 100644
index 0000000..87458a9
--- /dev/null
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -0,0 +1,145 @@
+import asyncio
+from urllib.parse import urlparse
+import datetime
+import os
+import time
+from dataclasses import dataclass
+from typing import Set, Dict, Optional, Callable, Awaitable
+
+import aioboto3
+
+
+@dataclass
+class S3Event:
+    src_path: str
+    modified_time: datetime.datetime
+
+
+class S3FileModifiedEvent(S3Event):
+    pass
+
+
+class S3FileCreatedEvent(S3Event):
+    pass
+
+
+class S3Watch(object):
+    def __init__(self, path: str, event_handler) -> None:
+        self.path = path
+        self.event_handler = event_handler
+
+
+class S3Observer:
+
+    def __init__(self, bucket, initial_scan=False) -> None:
+        self._bucket = bucket
+        self._cache: Dict[str, datetime.datetime] = {}
+        self._initial_scan = initial_scan
+        self._watches: Set[S3Watch] = set()
+
+        self._has_polled = False
+
+    async def start(self):
+        await self._run_periodically(loop=None,
+                                     wait_time=30,
+                                     func=self._poll)
+
+    def unschedule(self, watch: S3Watch):
+        self._watches.remove(watch)
+
+    def schedule(self, event_handler, path: str):
+        watch = S3Watch(path=path, event_handler=event_handler)
+        self._watches.add(watch)
+        return watch
+
+    @classmethod
+    async def _run_periodically(cls,
+                                loop: Optional[asyncio.AbstractEventLoop],
+                                wait_time: float,
+                                func: Callable[[any], Awaitable],
+                                *args,
+                                **kwargs):
+        """
+        Call a function periodically. This uses asyncio, and is non-blocking.
+        :param loop: An optional event loop to use. If None, the current 
running event loop will be used.
+        :param wait_time: seconds to wait between iterations of func
+        :param func: the async function that will be awaited
+        :param args: any args that need to be provided to func
+        """
+        if loop is None:
+            loop = asyncio.get_running_loop()
+        await func(*args, **kwargs)
+        loop.call_later(wait_time, loop.create_task, 
cls._run_periodically(loop, wait_time, func, *args, **kwargs))
+
+    async def _poll(self):
+        new_cache = {}
+        watch_index = {}
+
+        for watch in self._watches:
+            new_cache_for_watch = await self._get_s3_files(watch.path)
+            new_index = {file: watch for file in new_cache_for_watch}
+
+            new_cache = {**new_cache, **new_cache_for_watch}
+            watch_index = {**watch_index, **new_index}
+        difference = set(new_cache.items()) - set(self._cache.items())
+
+        if self._has_polled or self._initial_scan:
+            for (file, modified_date) in difference:
+                watch = watch_index[file]
+                file_is_new = file not in self._cache
+
+                if file_is_new:
+                    
watch.event_handler.on_created(S3FileCreatedEvent(src_path=file, 
modified_time=modified_date))
+                else:
+                    
watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file, 
modified_time=modified_date))
+
+        self._cache = new_cache
+        self._has_polled = True
+
+    async def _get_s3_files(self, path: str):
+        new_cache = {}
+
+        start = time.perf_counter()
+        async with aioboto3.resource("s3") as s3:
+            bucket = await s3.Bucket(self._bucket)
+
+            object_key = S3Observer._get_object_key(path)
+            async for file in bucket.objects.filter(Prefix=object_key):
+                new_cache[f"s3://{file.bucket_name}/{file.key}"] = await 
file.last_modified
+        end = time.perf_counter()
+        duration = end - start
+
+        print(f"Retrieved {len(new_cache)} objects in {duration}")
+
+        return new_cache
+
+    def _get_object_key(full_path: str):
+        key = urlparse(full_path).path.strip("/")
+        return key
+
+
+async def test():
+    observer = S3Observer(bucket="nexus-ingest", initial_scan=False)
+    handler = Handler()
+    observer.schedule(handler, 'avhrr/2012')
+    observer.schedule(handler, 'avhrr/2013')
+
+    await observer.start()
+
+    while True:
+        try:
+            await asyncio.sleep(1)
+        except KeyboardInterrupt:
+            return
+
+
+class Handler:
+    def on_created(self, event: S3Event):
+        print(f"File created: {event.src_path}")
+
+    def on_modified(self, event: S3Event):
+        print(f"File modified: {event.src_path}")
+
+
+if __name__ == "__main__":
+    asyncio.run(test())
diff --git a/collection_manager/collection_manager/services/__init__.py 
b/collection_manager/collection_manager/services/__init__.py
index 635d3dc..553e1b7 100644
--- a/collection_manager/collection_manager/services/__init__.py
+++ b/collection_manager/collection_manager/services/__init__.py
@@ -16,3 +16,4 @@
 from .CollectionProcessor import CollectionProcessor
 from .CollectionWatcher import CollectionWatcher
 from .MessagePublisher import MessagePublisher
+from .S3Observer import S3Observer
diff --git 
a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
 
b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index ffa065f..cf92997 100644
--- 
a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ 
b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -4,7 +4,6 @@ from pathlib import Path
 
 from collection_manager.services.history_manager.IngestionHistory import 
IngestionHistory
 from collection_manager.services.history_manager.IngestionHistory import 
IngestionHistoryBuilder
-from collection_manager.services.history_manager.IngestionHistory import 
md5sum_from_filepath
 
 logger = logging.getLogger(__name__)
 
@@ -33,7 +32,7 @@ class FileIngestionHistory(IngestionHistory):
         """
         self._dataset_id = dataset_id
         self._history_file_path = os.path.join(history_path, 
f'{dataset_id}.csv')
-        self._signature_fun = md5sum_from_filepath if signature_fun is None 
else signature_fun
+        self._signature_fun = signature_fun
         self._history_dict = {}
         self._load_history_dict()
 
diff --git 
a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
 
b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index ef73ccb..7f33c79 100644
--- 
a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ 
b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -1,4 +1,5 @@
 import hashlib
+from urllib.parse import urlparse
 import logging
 import os
 from abc import ABC, abstractmethod
@@ -6,6 +7,8 @@ from datetime import datetime
 from enum import Enum
 from typing import Optional
 
+from botocore.compat import filter_ssl_warnings
+
 logger = logging.getLogger(__name__)
 
 BLOCK_SIZE = 65536
@@ -35,49 +38,28 @@ class GranuleStatus(Enum):
 
 class IngestionHistory(ABC):
     _signature_fun = None
-    _latest_ingested_file_update = None
+    _latest_ingested_file_update: int = None
 
-    async def push(self, file_path: str):
+    async def push(self, file_path: str, modified_timestamp: int):
         """
         Record a file as having been ingested.
         :param file_path: The full path to the file to record.
         :return: None
         """
-        file_path = file_path.strip()
-        file_name = os.path.basename(file_path)
-        signature = self._signature_fun(file_path)
+        file_name = IngestionHistory._get_standardized_path(file_path)
+        signature = self._signature_fun(file_path) if self._signature_fun else 
str(modified_timestamp)
         await self._push_record(file_name, signature)
 
         if not self._latest_ingested_file_update:
-            self._latest_ingested_file_update = os.path.getmtime(file_path)
+            self._latest_ingested_file_update = modified_timestamp
         else:
-            self._latest_ingested_file_update = 
max(self._latest_ingested_file_update, os.path.getmtime(file_path))
+            self._latest_ingested_file_update = 
max(self._latest_ingested_file_update, modified_timestamp)
 
         await self._save_latest_timestamp()
 
-    def latest_ingested_mtime(self) -> Optional[datetime]:
-        """
-        Return the modified time of the most recently modified file that was 
ingested.
-        :return: A datetime or None
-        """
-        if self._latest_ingested_file_update:
-            return datetime.fromtimestamp(self._latest_ingested_file_update)
-        else:
-            return None
-
-    async def already_ingested(self, file_path: str) -> bool:
-        """
-        Return a boolean indicating whether the specified file has already 
been ingested, based on its signature.
-        :param file_path: The full path of a file to search for in the history.
-        :return: A boolean indicating whether this file has already been 
ingested or not
-        """
-        file_path = file_path.strip()
-        file_name = os.path.basename(file_path)
-        signature = self._signature_fun(file_path)
-        return signature == await self._get_signature(file_name)
-
     async def get_granule_status(self,
                                  file_path: str,
+                                 modified_timestamp: int,
                                  date_from: datetime = None,
                                  date_to: datetime = None) -> GranuleStatus:
         """
@@ -94,13 +76,43 @@ class IngestionHistory(ABC):
                         should fall in order to be "desired".
         :return: A GranuleStatus enum.
         """
-        if self._in_time_range(file_path, 
date_from=self.latest_ingested_mtime()):
+        signature = self._signature_fun(file_path) if self._signature_fun else 
str(modified_timestamp)
+
+        if self._in_time_range(modified_timestamp, 
start_date=self._latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(file_path, date_from, date_to) and not await 
self.already_ingested(file_path):
+        elif self._in_time_range(modified_timestamp, date_from, date_to) and 
not await self._already_ingested(file_path, signature):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
 
+    def _get_standardized_path(file_path: str):
+        file_path = file_path.strip()
+        # TODO: Why do we need to record the basename of the path, instead of 
just the full path?
+        # The only reason this is here right now is for backwards 
compatibility to avoid triggering a full reingestion.
+        if urlparse(file_path).scheme == 's3':
+            return urlparse(file_path).path.strip("/")
+        else:
+            return os.path.basename(file_path)
+
+    def _latest_ingested_mtime(self) -> Optional[datetime]:
+        """
+        Return the modified time of the most recently modified file that was 
ingested.
+        :return: A datetime or None
+        """
+        if self._latest_ingested_file_update:
+            return datetime.fromtimestamp(self._latest_ingested_file_update)
+        else:
+            return None
+
+    async def _already_ingested(self, file_path: str, signature) -> bool:
+        """
+        Return a boolean indicating whether the specified file has already 
been ingested, based on its signature.
+        :param file_path: The full path of a file to search for in the history.
+        :return: A boolean indicating whether this file has already been 
ingested or not
+        """
+        file_name = IngestionHistory._get_standardized_path(file_path)
+        return signature == await self._get_signature(file_name)
+
     @abstractmethod
     async def _save_latest_timestamp(self):
         pass
@@ -114,15 +126,14 @@ class IngestionHistory(ABC):
         pass
 
     @staticmethod
-    def _in_time_range(file, date_from: datetime = None, date_to: datetime = 
None):
+    def _in_time_range(timestamp: int, start_date: datetime = None, end_date: 
datetime = None):
         """
         :param file: file path as a string
         :param date_from: timestamp, can be None
         :param date_to: timestamp, can be None
         :return: True is the update time of the file is between ts_from and 
ts_to. False otherwise
         """
-        file_modified_time = os.path.getmtime(file)
-        is_after_from = date_from.timestamp() < file_modified_time if 
date_from else True
-        is_before_to = date_to.timestamp() > file_modified_time if date_to 
else True
+        is_after_from = int(start_date.timestamp()) < timestamp if start_date 
else True
+        is_before_to = int(end_date.timestamp()) > timestamp if end_date else 
True
 
         return is_after_from and is_before_to
diff --git 
a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
 
b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 59f5cd7..c6d26a5 100644
--- 
a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ 
b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -3,11 +3,8 @@ import logging
 
 import pysolr
 import requests
-
+from collection_manager.services.history_manager.IngestionHistory import 
(IngestionHistory, IngestionHistoryBuilder)
 from common.async_utils.AsyncUtils import run_in_executor
-from collection_manager.services.history_manager.IngestionHistory import 
IngestionHistory
-from collection_manager.services.history_manager.IngestionHistory import 
IngestionHistoryBuilder
-from collection_manager.services.history_manager.IngestionHistory import 
md5sum_from_filepath
 
 logging.getLogger("pysolr").setLevel(logging.WARNING)
 logger = logging.getLogger(__name__)
@@ -40,7 +37,7 @@ class SolrIngestionHistory(IngestionHistory):
             self._solr_granules = 
pysolr.Solr(f"{self._url_prefix}/{self._granule_collection_name}")
             self._solr_datasets = 
pysolr.Solr(f"{self._url_prefix}/{self._dataset_collection_name}")
             self._dataset_id = dataset_id
-            self._signature_fun = md5sum_from_filepath if signature_fun is 
None else signature_fun
+            self._signature_fun = signature_fun
             self._latest_ingested_file_update = self._get_latest_file_update()
         except requests.exceptions.RequestException:
             raise DatasetIngestionHistorySolrException(f"solr instance 
unreachable {solr_url}")
@@ -67,7 +64,7 @@ class SolrIngestionHistory(IngestionHistory):
             self._solr_datasets.add([{
                 'id': self._dataset_id,
                 'dataset_s': self._dataset_id,
-                'latest_update_l': int(self._latest_ingested_file_update)}])
+                'latest_update_l': self._latest_ingested_file_update}])
             self._solr_datasets.commit()
 
     def _get_latest_file_update(self):
diff --git a/collection_manager/docker/Dockerfile 
b/collection_manager/docker/Dockerfile
index 2a57784..83e94ad 100644
--- a/collection_manager/docker/Dockerfile
+++ b/collection_manager/docker/Dockerfile
@@ -12,7 +12,9 @@ COPY collection_manager/requirements.txt 
/collection_manager/requirements.txt
 COPY collection_manager/README.md /collection_manager/README.md
 COPY collection_manager/docker/entrypoint.sh /entrypoint.sh
 
-RUN cd /common && python setup.py install
+RUN cd /common && python setup.py install 
 RUN cd /collection_manager && python setup.py install
 
+RUN pip install boto3==1.16.10
+
 ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
diff --git a/collection_manager/docker/entrypoint.sh 
b/collection_manager/docker/entrypoint.sh
index 988dd2c..cad304a 100644
--- a/collection_manager/docker/entrypoint.sh
+++ b/collection_manager/docker/entrypoint.sh
@@ -7,5 +7,6 @@ python /collection_manager/collection_manager/main.py \
   $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo 
--rabbitmq-password=$RABBITMQ_PASSWORD) \
   $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \
   $([[ ! -z "$HISTORY_URL" ]] && echo --history-url=$HISTORY_URL) \
-  $([[ ! -z "$HISTORY_PATH" ]] && echo --history-path=$HISTORY_PATH)
-  $([[ ! -z "$REFRESH" ]] && echo --refresh=$REFRESH)
+  $([[ ! -z "$HISTORY_PATH" ]] && echo --history-path=$HISTORY_PATH) \
+  $([[ ! -z "$REFRESH" ]] && echo --refresh=$REFRESH) \
+  $([[ ! -z "$S3_BUCKET" ]] && echo --s3-bucket=$S3_BUCKET)
diff --git a/collection_manager/requirements.txt 
b/collection_manager/requirements.txt
index ee12c89..c4b6323 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -3,5 +3,7 @@ pystache==0.5.4
 pysolr==3.9.0
 watchdog==0.10.2
 requests==2.23.0
-aio-pika==6.6.1
-tenacity==6.2.0
\ No newline at end of file
+tenacity==6.2.0
+aioboto3==8.0.5
+aiohttp==3.7.2
+aio-pika==6.7.1
\ No newline at end of file
diff --git a/collection_manager/setup.py b/collection_manager/setup.py
index 0616d0f..e1178f8 100644
--- a/collection_manager/setup.py
+++ b/collection_manager/setup.py
@@ -29,7 +29,7 @@ setuptools.setup(
         "Operating System :: OS Independent",
         "Development Status :: 4 - Beta",
     ],
-    python_requires='>=3.6',
+    python_requires='>=3.8',
     include_package_data=True,
     install_requires=pip_requirements
 )
diff --git 
a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
 
b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
index 07ab0e1..8bd939e 100644
--- 
a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
+++ 
b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
@@ -36,7 +36,7 @@ class TestFileIngestionHistory(unittest.TestCase):
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
             await ingestion_history.push(str(current_file_path))
-            self.assertTrue(await 
ingestion_history.already_ingested(str(current_file_path)))
+            self.assertTrue(await 
ingestion_history._already_ingested(str(current_file_path)))
 
             del ingestion_history
 
@@ -47,7 +47,7 @@ class TestFileIngestionHistory(unittest.TestCase):
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
             await ingestion_history.push(str(current_file_path))
-            self.assertTrue(await 
ingestion_history.already_ingested(str(current_file_path)))
+            self.assertTrue(await 
ingestion_history._already_ingested(str(current_file_path)))
 
             del ingestion_history
 
@@ -57,7 +57,7 @@ class TestFileIngestionHistory(unittest.TestCase):
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, 
md5sum_from_filepath)
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
-            self.assertFalse(await 
ingestion_history.already_ingested(str(current_file_path)))
+            self.assertFalse(await 
ingestion_history._already_ingested(str(current_file_path)))
 
 
 if __name__ == '__main__':
diff --git a/collection_manager/tests/services/test_S3Observer.py 
b/collection_manager/tests/services/test_S3Observer.py
new file mode 100644
index 0000000..3fa49e0
--- /dev/null
+++ b/collection_manager/tests/services/test_S3Observer.py
@@ -0,0 +1,8 @@
+from collection_manager.services import S3Observer
+import unittest
+
+
+class TestS3Observer(unittest.TestCase):
+
+    def test_get_object_key(self):
+        self.assertEqual('test_dir/object.nc', 
S3Observer._get_object_key('s3://test-bucket/test_dir/object.nc'))
diff --git a/granule_ingester/conda-requirements.txt 
b/granule_ingester/conda-requirements.txt
index da92b1e..810e278 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -7,5 +7,4 @@ xarray
 pyyaml==5.3.1
 requests==2.23.0
 aiohttp==3.6.2
-aio-pika==6.6.1
 tenacity
diff --git a/granule_ingester/docker/Dockerfile 
b/granule_ingester/docker/Dockerfile
index 57bacff..1e7aedd 100644
--- a/granule_ingester/docker/Dockerfile
+++ b/granule_ingester/docker/Dockerfile
@@ -18,6 +18,8 @@ RUN ./install_nexusproto.sh
 RUN cd /common && python setup.py install
 RUN cd /sdap && python setup.py install
 
+RUN pip install boto3==1.16.10
+
 RUN apk del .build-deps
 
 ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
\ No newline at end of file
diff --git a/granule_ingester/requirements.txt 
b/granule_ingester/requirements.txt
index 9b06860..d82e6ce 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,6 +1,7 @@
 cassandra-driver==3.23.0
 aiomultiprocess==0.7.0
-aioboto3
+aioboto3==8.0.5
 tblib==1.6.0
 pysolr==3.9.0
-kazoo==2.8.0
\ No newline at end of file
+kazoo==2.8.0
+aio-pika==6.7.1
\ No newline at end of file

Reply via email to