This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 1a15971e946950e86769fd3763ced15a2e5bb00d
Author: Eamon Ford <[email protected]>
AuthorDate: Thu Oct 29 15:33:56 2020 -0700

    fix signature_fun for s3
---
 collection_manager/collection_manager/main.py      | 21 +++++++++++----
 .../history_manager/FileIngestionHistory.py        |  3 +--
 .../services/history_manager/IngestionHistory.py   | 30 +++++++++++++---------
 .../history_manager/SolrIngestionHistory.py        |  7 ++---
 4 files changed, 37 insertions(+), 24 deletions(-)

diff --git a/collection_manager/collection_manager/main.py 
b/collection_manager/collection_manager/main.py
index 044cb87..3dba6e0 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -3,8 +3,11 @@ import asyncio
 import logging
 import os
 
-from collection_manager.services import CollectionProcessor, 
CollectionWatcher, MessagePublisher
-from collection_manager.services.history_manager import 
SolrIngestionHistoryBuilder, FileIngestionHistoryBuilder
+from collection_manager.services import (CollectionProcessor,
+                                         CollectionWatcher, MessagePublisher)
+from collection_manager.services.history_manager import (
+    FileIngestionHistoryBuilder, SolrIngestionHistoryBuilder,
+    md5sum_from_filepath)
 
 logging.basicConfig(level=logging.INFO)
 logging.getLogger("pika").setLevel(logging.WARNING)
@@ -58,11 +61,19 @@ def get_args() -> argparse.Namespace:
 async def main():
     try:
         options = get_args()
+        ENABLE_S3 = False
+
+        if ENABLE_S3:
+            signature_fun = None
+        else:
+            signature_fun = md5sum_from_filepath
 
         if options.history_path:
-            history_manager_builder = 
FileIngestionHistoryBuilder(history_path=options.history_path)
+            history_manager_builder = 
FileIngestionHistoryBuilder(history_path=options.history_path,
+                                                                  
signature_fun=signature_fun)
         else:
-            history_manager_builder = 
SolrIngestionHistoryBuilder(solr_url=options.history_url)
+            history_manager_builder = 
SolrIngestionHistoryBuilder(solr_url=options.history_url,
+                                                                  
signature_fun=signature_fun)
         async with MessagePublisher(host=options.rabbitmq_host,
                                     username=options.rabbitmq_username,
                                     password=options.rabbitmq_password,
@@ -72,7 +83,7 @@ async def main():
             collection_watcher = 
CollectionWatcher(collections_path=options.collections_path,
                                                    
granule_updated_callback=collection_processor.process_granule,
                                                    
collections_refresh_interval=int(options.refresh),
-                                                   s3=True)
+                                                   s3=ENABLE_S3)
 
             await collection_watcher.start_watching()
             while True:
diff --git 
a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
 
b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index ffa065f..cf92997 100644
--- 
a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ 
b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -4,7 +4,6 @@ from pathlib import Path
 
 from collection_manager.services.history_manager.IngestionHistory import 
IngestionHistory
 from collection_manager.services.history_manager.IngestionHistory import 
IngestionHistoryBuilder
-from collection_manager.services.history_manager.IngestionHistory import 
md5sum_from_filepath
 
 logger = logging.getLogger(__name__)
 
@@ -33,7 +32,7 @@ class FileIngestionHistory(IngestionHistory):
         """
         self._dataset_id = dataset_id
         self._history_file_path = os.path.join(history_path, 
f'{dataset_id}.csv')
-        self._signature_fun = md5sum_from_filepath if signature_fun is None 
else signature_fun
+        self._signature_fun = signature_fun
         self._history_dict = {}
         self._load_history_dict()
 
diff --git 
a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
 
b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index ea50ffb..d901690 100644
--- 
a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ 
b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -38,28 +38,29 @@ class GranuleStatus(Enum):
 
 class IngestionHistory(ABC):
     _signature_fun = None
-    _latest_ingested_file_update = None
+    _latest_ingested_file_update: float = None
 
-    async def push(self, file_path: str, modified_time: datetime):
+    async def push(self, file_path: str, modified_datetime: datetime):
         """
         Record a file as having been ingested.
         :param file_path: The full path to the file to record.
         :return: None
         """
+        modified_timestamp = int(modified_datetime.timestamp())
         file_name = IngestionHistory._get_standardized_path(file_path)
-        signature = self._signature_fun(file_path)
+        signature = self._signature_fun(file_path) if self._signature_fun else 
self._signature_from_timestamp(modified_timestamp)
         await self._push_record(file_name, signature)
 
         if not self._latest_ingested_file_update:
-            self._latest_ingested_file_update = modified_time 
+            self._latest_ingested_file_update = modified_timestamp
         else:
-            self._latest_ingested_file_update = 
max(self._latest_ingested_file_update, modified_time)
+            self._latest_ingested_file_update = 
max(self._latest_ingested_file_update, modified_timestamp)
 
         await self._save_latest_timestamp()
 
     async def get_granule_status(self,
                                  file_path: str,
-                                 modified_time: datetime,
+                                 modified_datetime: datetime,
                                  date_from: datetime = None,
                                  date_to: datetime = None) -> GranuleStatus:
         """
@@ -76,9 +77,11 @@ class IngestionHistory(ABC):
                         should fall in order to be "desired".
         :return: A GranuleStatus enum.
         """
-        if self._in_time_range(modified_time, 
start_date=self._latest_ingested_mtime()):
+        signature = self._signature_fun(file_path) if self._signature_fun else 
self._signature_from_timestamp(modified_datetime.timestamp())
+
+        if self._in_time_range(modified_datetime, 
start_date=self._latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(modified_time, date_from, date_to) and not 
await self._already_ingested(file_path):
+        elif self._in_time_range(modified_datetime, date_from, date_to) and 
not await self._already_ingested(file_path, signature):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
@@ -102,14 +105,13 @@ class IngestionHistory(ABC):
         else:
             return None
 
-    async def _already_ingested(self, file_path: str) -> bool:
+    async def _already_ingested(self, file_path: str, signature) -> bool:
         """
         Return a boolean indicating whether the specified file has already 
been ingested, based on its signature.
         :param file_path: The full path of a file to search for in the history.
         :return: A boolean indicating whether this file has already been 
ingested or not
         """
         file_name = IngestionHistory._get_standardized_path(file_path)
-        signature = self._signature_fun(file_path)
         return signature == await self._get_signature(file_name)
 
     @abstractmethod
@@ -132,7 +134,11 @@ class IngestionHistory(ABC):
         :param date_to: timestamp, can be None
         :return: True is the update time of the file is between ts_from and 
ts_to. False otherwise
         """
-        is_after_from = start_date.timestamp() < date if start_date else True
-        is_before_to = end_date.timestamp() > date if end_date else True
+        is_after_from = start_date.timestamp() < date.timestamp() if 
start_date else True
+        is_before_to = end_date.timestamp() > date.timestamp() if end_date 
else True
 
         return is_after_from and is_before_to
+
+    @staticmethod
+    def _signature_from_timestamp(timestamp: float):
+        return str(int(timestamp))
diff --git 
a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
 
b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 59f5cd7..ebed073 100644
--- 
a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ 
b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -3,11 +3,8 @@ import logging
 
 import pysolr
 import requests
-
+from collection_manager.services.history_manager.IngestionHistory import 
(IngestionHistory, IngestionHistoryBuilder)
 from common.async_utils.AsyncUtils import run_in_executor
-from collection_manager.services.history_manager.IngestionHistory import 
IngestionHistory
-from collection_manager.services.history_manager.IngestionHistory import 
IngestionHistoryBuilder
-from collection_manager.services.history_manager.IngestionHistory import 
md5sum_from_filepath
 
 logging.getLogger("pysolr").setLevel(logging.WARNING)
 logger = logging.getLogger(__name__)
@@ -40,7 +37,7 @@ class SolrIngestionHistory(IngestionHistory):
             self._solr_granules = 
pysolr.Solr(f"{self._url_prefix}/{self._granule_collection_name}")
             self._solr_datasets = 
pysolr.Solr(f"{self._url_prefix}/{self._dataset_collection_name}")
             self._dataset_id = dataset_id
-            self._signature_fun = md5sum_from_filepath if signature_fun is 
None else signature_fun
+            self._signature_fun = signature_fun
             self._latest_ingested_file_update = self._get_latest_file_update()
         except requests.exceptions.RequestException:
             raise DatasetIngestionHistorySolrException(f"solr instance 
unreachable {solr_url}")

Reply via email to