This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch ascending_latitudes in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit cd4278526f192d1a9ea82561dcb1763c225cc090 Author: Eamon Ford <[email protected]> AuthorDate: Thu Jun 25 16:10:27 2020 -0700 SDAP-237 Dockerize Collection Manager (#4) Co-authored-by: Eamon Ford <[email protected]> --- collection_manager/README.md | 132 ++++++++------------- collection_manager/collection_manager/main.py | 33 +++--- collection_manager/containers/docker/Dockerfile | 11 -- .../containers/kubernetes/data-volume.yml | 35 ------ collection_manager/containers/kubernetes/job.yml | 39 ------ .../containers/kubernetes/sdap_ingester_config.yml | 38 ------ collection_manager/docker/Dockerfile | 16 +++ collection_manager/docker/entrypoint.sh | 10 ++ collection_manager/setup.py | 8 +- granule_ingester/granule_ingester/main.py | 3 +- 10 files changed, 100 insertions(+), 225 deletions(-) diff --git a/collection_manager/README.md b/collection_manager/README.md index cbaf1fb..9d00cbb 100644 --- a/collection_manager/README.md +++ b/collection_manager/README.md @@ -1,103 +1,75 @@ -# SDAP manager for ingestion of datasets +# SDAP Collection Manager -## Prerequisites - -### python 3 - -Install anaconda for python 3. From the graphic install for example for macos: - -https://www.anaconda.com/distribution/#macos - -### git lfs (for development) - -Git lfs for the deployment from git, see https://git-lfs.github.com/ - -If not available you have to get netcdf files for test, if you do need the tests. - -### Deployed nexus on kubernetes cluster +The SDAP Collection Manager is a service that watches a YAML file (the [Collections +Configuration](#the-collections-configuration-file) file) stored on the filesystem, and all the directories listed in that +file. Whenever new granules are added to any of the watched directories, the Collection +Manager service will publish a message to RabbitMQ to be picked up by the Granule Ingester +(`/granule_ingester` in this repository), which will then ingest the new granules. -See project https://github.com/apache/incubator-sdap-nexus - $ helm install nexus . --namespace=sdap --dependency-update -f ~/overridden-nexus-values.yml - -For development purpose, you might want to expose solr port outside kubernetes - - kubectl port-forward solr-set-0 8983:8983 -n sdap +## Prerequisites - -## For developers +Python 3.7 -### deploy project +## Building the service +From `incubator-sdap-ingester/collection_manager`, run: - $ bash - $ git clone ... - $ cd sdap_ingest_manager - $ python -m venv venv - $ source ./venv/bin/activate - $ pip install . - $ pytest -s + $ python setup.py install -Note the command pip install -e . does not work as it does not deploy the configuration files. - -### Update the project - -Update the code and the test with your favorite IDE (e.g. pyCharm). - -### Launch for development/tests - -### Prerequisite -Deploy a local rabbitmq service, for example with docker. +## Running the service +From `incubator-sdap-ingester/collection_manager`, run: - docker run -d --hostname localhost -p 5672:5672 --name rabbitmq rabbitmq:3 - - -### Launch the service + $ python collection_manager/main.py -h + +### The Collections Configuration File +A path to a collections configuration file must be passed in to the Collection Manager +at startup via the `--collections-path` parameter. Below is an example of what the +collections configuration file should look like: -The service reads the collection configuration and submit granule ingestion messages to the message broker (rabbitmq). -For each collection, 2 ingestion priority levels are proposed: the nominal priority, the priority for forward processing (newer files), usually higher. -An history of the ingested granules is managed so that the ingestion can stop and re-start anytime. +```yaml +# collections.yaml - cd collection_manager - python main.py -h - python main.py --collections ../tests/resources/data/collections.yml --history-path=/tmp +collections: -# Containerization + # The identifier for the dataset as it will appear in NEXUS. + - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND -TO BE UPDATED + # The local path to watch for NetCDF granule files to be associated with this dataset. + # Supports glob-style patterns. + path: /opt/data/grace/*land*.nc -## Docker + # The name of the NetCDF variable to read when ingesting granules into NEXUS for this dataset. + variable: lwe_thickness - docker build . -f containers/docker/config-operator/Dockerfile --no-cache --tag tloubrieu/sdap-ingest-manager:latest - -To publish the docker image on dockerhub do (step necessary for kubernetes deployment): + # An integer priority level to use when publishing messages to RabbitMQ for historical data. + # Higher number = higher priority. + priority: 1 - docker login - docker push tloubrieu/sdap-ingest-manager:latest - -## Kubernetes - -### Launch the service + # An integer priority level to use when publishing messages to RabbitMQ for forward-processing data. + # Higher number = higher priority. + forward-processing-priority: 5 - kubectl apply -f containers/kubernetes/job.yml -n sdap - -Delete the service: + - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN + path: /opt/data/grace/*ocean*.nc + variable: lwe_thickness + priority: 2 + forward-processing-priority: 6 - kubectl delete jobs --all -n sdap - - + - id: AVHRR_OI-NCEI-L4-GLOB-v2.0 + path: /opt/data/avhrr/*.nc + variable: analysed_sst + priority: 1 - +``` +## Running the tests +From `incubator-sdap-ingester/collection_manager`, run: + $ pip install pytest + $ pytest - - - - - - - - - +## Building the Docker image +From `incubator-sdap-ingester/collection_manager`, run: + $ docker build . -f docker/Dockerfile -t nexusjpl/collection-manager diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py index bc2d356..d8d2a5a 100644 --- a/collection_manager/collection_manager/main.py +++ b/collection_manager/collection_manager/main.py @@ -8,7 +8,7 @@ from collection_manager.services.history_manager import SolrIngestionHistoryBuil logging.basicConfig(level=logging.INFO) logging.getLogger("pika").setLevel(logging.WARNING) -logger = logging.getLogger("collection_manager") +logger = logging.getLogger(__name__) def check_path(path) -> str: @@ -18,34 +18,35 @@ def check_path(path) -> str: def get_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Run ingestion for a list of collection ingestion streams") - parser.add_argument("--refresh", - help="refresh interval in seconds to check for new or updated granules", - default=300) - parser.add_argument("--collections", + parser = argparse.ArgumentParser(description="Watch the filesystem for new granules, and publish messages to " + "RabbitMQ whenever they become available.") + parser.add_argument("--collections-path", help="Absolute path to collections configuration file", + metavar="PATH", required=True) - parser.add_argument('--rabbitmq_host', + history_group = parser.add_mutually_exclusive_group(required=True) + history_group.add_argument("--history-path", + metavar="PATH", + help="Absolute path to ingestion history local directory") + history_group.add_argument("--history-url", + metavar="URL", + help="URL to ingestion history solr database") + parser.add_argument('--rabbitmq-host', default='localhost', metavar='HOST', help='RabbitMQ hostname to connect to. (Default: "localhost")') - parser.add_argument('--rabbitmq_username', + parser.add_argument('--rabbitmq-username', default='guest', metavar='USERNAME', help='RabbitMQ username. (Default: "guest")') - parser.add_argument('--rabbitmq_password', + parser.add_argument('--rabbitmq-password', default='guest', metavar='PASSWORD', help='RabbitMQ password. (Default: "guest")') - parser.add_argument('--rabbitmq_queue', + parser.add_argument('--rabbitmq-queue', default="nexus", metavar="QUEUE", help='Name of the RabbitMQ queue to consume from. (Default: "nexus")') - history_group = parser.add_mutually_exclusive_group(required=True) - history_group.add_argument("--history-path", - help="Absolute path to ingestion history local directory") - history_group.add_argument("--history-url", - help="URL to ingestion history solr database") return parser.parse_args() @@ -65,7 +66,7 @@ def main(): publisher.connect() collection_processor = CollectionProcessor(message_publisher=publisher, history_manager_builder=history_manager_builder) - collection_watcher = CollectionWatcher(collections_path=options.collections, + collection_watcher = CollectionWatcher(collections_path=options.collections_path, collection_updated_callback=collection_processor.process_collection, granule_updated_callback=collection_processor.process_granule) diff --git a/collection_manager/containers/docker/Dockerfile b/collection_manager/containers/docker/Dockerfile deleted file mode 100644 index 3ba8da7..0000000 --- a/collection_manager/containers/docker/Dockerfile +++ /dev/null @@ -1,11 +0,0 @@ -FROM python:3 - -# Add kubernetes client to create other pods (ingester) -RUN apt-get update && apt-get install -y apt-transport-https gnupg2 -RUN curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - -RUN echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list -RUN apt-get update && apt-get install -y kubectl - -RUN pip install https://github.com/tloubrieu-jpl/incubator-sdap-nexus-ingestion-manager/releases/download/0.4.0+dev/sdap_ingest_manager-0.4.0+dev-py3-none-any.whl - -CMD bash diff --git a/collection_manager/containers/kubernetes/data-volume.yml b/collection_manager/containers/kubernetes/data-volume.yml deleted file mode 100644 index b2d3815..0000000 --- a/collection_manager/containers/kubernetes/data-volume.yml +++ /dev/null @@ -1,35 +0,0 @@ -apiVersion: v1 -kind: PersistentVolume -metadata: - name: data-volume - labels: - name: data-volume -spec: - capacity: - storage: 3Gi - volumeMode: Filesystem - accessModes: - - ReadWriteOnce - persistentVolumeReclaimPolicy: Delete - storageClassName: hostpath - hostPath: - path: /Users/loubrieu/PycharmProjects/sdap_ingest_manager/sdap_ingest_manager/ingestion_order_executor/history_manager/data - type: Directory - ---- - -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: data-volume-claim -spec: - accessModes: - - ReadWriteOnce - volumeMode: Filesystem - resources: - requests: - storage: 3Gi - storageClassName: hostpath - selector: - matchLabels: - name: "data-volume" diff --git a/collection_manager/containers/kubernetes/job.yml b/collection_manager/containers/kubernetes/job.yml deleted file mode 100644 index 1d8bc16..0000000 --- a/collection_manager/containers/kubernetes/job.yml +++ /dev/null @@ -1,39 +0,0 @@ -apiVersion: batch/v1 -kind: Job -metadata: - name: collection-ingester -spec: - template: - spec: - containers: - - name: collections-ingester - image: tloubrieu/sdap-ingest-manager:latest - imagePullPolicy: IfNotPresent - command: ["run_collections", "--config=/opt/sdap_ingester_config/"] - volumeMounts: - - name: config-vol - mountPath: /opt/sdap_ingester_config/ - - name: data-volume-for-collection-ingester - mountPath: /data - readOnly: true - volumes: - - name: config-vol - configMap: - name: collection-ingester-config - - name: data-volume-for-collection-ingester - #hostPath: - # path: /Users/loubrieu/PycharmProjects/sdap_ingest_manager/sdap_ingest_manager/ingestion_order_executor/history_manager/data - # type: Directory - persistentVolumeClaim: - claimName: data-volume-claim - - restartPolicy: Never - backoffLimit: 4 - ---- - - - - - - diff --git a/collection_manager/containers/kubernetes/sdap_ingester_config.yml b/collection_manager/containers/kubernetes/sdap_ingester_config.yml deleted file mode 100644 index 425b687..0000000 --- a/collection_manager/containers/kubernetes/sdap_ingester_config.yml +++ /dev/null @@ -1,38 +0,0 @@ -apiVersion: v1 -data: - collections.yml: |+ - # collection id with only letter and - - # path: regular expression matching the netcdf files which compose the collection - # variable: netcdf variable to be ingested (only one per dataset) - # priority: order in which collections will be processed, the smaller numbers first. - avhrr-oi-analysed-sst: - path: /data/avhrr_oi/*.nc - variable: analysed_sst - priority: 2 - - sdap_ingest_manager.ini: |+ - [COLLECTIONS_YAML_CONFIG] - # config_path is the value sent as argument to the run_collection command, default is /opt/sdap_ingester_config - yaml_file = %(config_path)s/collections.yml - - [OPTIONS] - # set to False to actually call the ingestion command for each granule - # relative path starts at {sys.prefix}/.sdap_ingest_manager - dry_run = False - # set to True to automatically list the granules as seen on the nfs server when they are mounted on the local file system. - deconstruct_nfs = False - # number of parallel ingestion pods on kubernetes (1 per granule) - parallel_pods = 8 - - [INGEST] - # kubernetes namespace where the sdap cluster is deployed - kubernetes_namespace = sdap - - -kind: ConfigMap -metadata: - creationTimestamp: "2020-04-17T00:11:46Z" - name: collection-ingester-config - resourceVersion: "2398917" - selfLink: /api/v1/namespaces/default/configmaps/collection-ingester - uid: b914e302-736c-4c25-9943-ebc33db418ce diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile new file mode 100644 index 0000000..ce1b577 --- /dev/null +++ b/collection_manager/docker/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3 + +RUN apt-get update && apt-get install -y apt-transport-https gnupg2 +RUN curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - +RUN echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list +RUN apt-get update && apt-get install -y kubectl + +COPY /collection_manager /collection_manager/collection_manager +COPY /setup.py /collection_manager/setup.py +COPY /requirements.txt /collection_manager/requirements.txt +COPY /README.md /collection_manager/README.md +COPY /docker/entrypoint.sh /entrypoint.sh + +RUN cd /collection_manager && python setup.py install + +ENTRYPOINT ["/bin/bash", "/entrypoint.sh"] diff --git a/collection_manager/docker/entrypoint.sh b/collection_manager/docker/entrypoint.sh new file mode 100644 index 0000000..eb88f75 --- /dev/null +++ b/collection_manager/docker/entrypoint.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +python /collection_manager/collection_manager/main.py \ + $([[ ! -z "$COLLECTIONS_PATH" ]] && echo --collections-path=$COLLECTIONS_PATH) \ + $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq-host=$RABBITMQ_HOST) \ + $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq-username=$RABBITMQ_USERNAME) \ + $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq-password=$RABBITMQ_PASSWORD) \ + $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \ + $([[ ! -z "$HISTORY_URL" ]] && echo --history-url=$HISTORY_URL) \ + $([[ ! -z "$HISTORY_PATH" ]] && echo --history-path=$HISTORY_PATH) diff --git a/collection_manager/setup.py b/collection_manager/setup.py index 49b0d75..1542486 100644 --- a/collection_manager/setup.py +++ b/collection_manager/setup.py @@ -1,9 +1,7 @@ -import setuptools -import os -import subprocess -import sys import re +import setuptools + PACKAGE_NAME = "sdap_collection_manager" with open("./collection_manager/__init__.py") as fi: @@ -24,7 +22,7 @@ setuptools.setup( description="a helper to ingest data in sdap", long_description=long_description, long_description_content_type="text/markdown", - url="https://github.com/tloubrieu-jpl/incubator-sdap-nexus-ingestion-manager", + url="https://github.com/apache/incubator-sdap-ingester", packages=setuptools.find_packages(), classifiers=[ "Programming Language :: Python :: 3", diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 29795f7..5a8fc2d 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -45,7 +45,8 @@ async def run_health_checks(dependencies: List[HealthCheck]): async def main(): - parser = argparse.ArgumentParser(description='Process some integers.') + parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process ' + 'and ingest a granule for each message that comes through.') parser.add_argument('--rabbitmq_host', default='localhost', metavar='HOST',
