This is an automated email from the ASF dual-hosted git repository.

nchung pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-sdap-in-situ-data-services.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e93f75  feat: CLI script to ingest S3 files (#5)
4e93f75 is described below

commit 4e93f7521a9d7d58b351b5f59a8be18b0a646534
Author: wphyojpl <[email protected]>
AuthorDate: Wed Jun 22 13:03:54 2022 -0700

    feat: CLI script to ingest S3 files (#5)
    
    * feat: add CLI script
    
    * chore: update readme
    
    * chore: add changelog
---
 CHANGELOG.md                      | 14 ++++++
 README.md                         | 32 ++++++++++++-
 parquet_cli/__init__.py           |  0
 parquet_cli/ingest_s3/__init__.py |  0
 parquet_cli/ingest_s3/__main__.py | 94 +++++++++++++++++++++++++++++++++++++++
 5 files changed, 139 insertions(+), 1 deletion(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..e44938a
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,14 @@
+# Changelog
+All notable changes to this project will be documented in this file.
+
+The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/),
+and this project adheres to [Semantic 
Versioning](https://semver.org/spec/v2.0.0.html).
+
+## [Unreleased]
+### Added
+- CDMS-xxx: Added `CLI` script to ingest S3 data into the Parquet system
+### Changed
+### Deprecated
+### Removed
+### Fixed
+### Security
diff --git a/README.md b/README.md
index b573410..d08c868 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,35 @@
-# parquet_test_1
+# Insitu Data in Parquet format stored in S3
 
+### How to ingest a insitu json file to Parquet
+- Assumption: K8s is successfully deployed
+- Download this repo
+- (optional) create different python3.6 environment
+- install dependencies
+
+        python3 setup.py install
+- setup AWS tokens
+    
+        export AWS_ACCESS_KEY_ID=xxx
+        export AWS_SECRET_ACCESS_KEY=xxx
+        export AWS_SESSION_TOKEN=really.long.token
+        export AWS_REGION=us-west-2
+    - alternatively the `default` profile under `~/.aws/credentials` can be 
setup as well
+- setup current directory to `PYTHONPATH`
+        
+        PYTHONPATH="${PYTHONPATH}:/absolute/path/to/current/dir/"
+- run the script: 
+
+        python3 -m parquet_cli.ingest_s3 --help
+    - sample script:
+    
+            python3 -m parquet_cli.ingest_s3 \
+              --LOG_LEVEL 30 \
+              --CDMS_DOMAIN https://doms.jpl.nasa.gov/insitu  \
+              --CDMS_BEARER_TOKEN Mock-CDMS-Flask-Token  \
+              --PARQUET_META_TBL_NAME cdms_parquet_meta_dev_v1  \
+              --BUCKET_NAME cdms-dev-ncar-in-situ-stage  \
+              --KEY_PREFIX cdms_icoads_2017-01-01.json
+  
 ### Ref:
 - how to replace parquet file partially
 ```
diff --git a/parquet_cli/__init__.py b/parquet_cli/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/parquet_cli/ingest_s3/__init__.py 
b/parquet_cli/ingest_s3/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/parquet_cli/ingest_s3/__main__.py 
b/parquet_cli/ingest_s3/__main__.py
new file mode 100644
index 0000000..c4a2948
--- /dev/null
+++ b/parquet_cli/ingest_s3/__main__.py
@@ -0,0 +1,94 @@
+import argparse
+import base64
+import logging
+import os
+
+os.environ['master_spark_url'] = ''
+os.environ['spark_app_name'] = ''
+os.environ['parquet_file_name'] = ''
+os.environ['in_situ_schema'] = ''
+os.environ['authentication_type'] = ''
+os.environ['authentication_key'] = ''
+os.environ['parquet_metadata_tbl'] = ''
+
+from parquet_flask.cdms_lambda_func.lambda_func_env import LambdaFuncEnv
+
+
+class IngestS3Entry:
+    BUCKET_NAME_KEY = 'BUCKET_NAME'
+    KEY_PREFIX_KEY = 'KEY_PREFIX'
+
+    def __init__(self):
+        self.__a = ''
+
+    def __get_args(self) -> argparse.Namespace:
+        parser = argparse.ArgumentParser(description="Ingesting 1 or more S3 
files into Parquet. Note that AWS environment variables should be set before 
running this")
+        parser.add_argument(f'--{LambdaFuncEnv.CDMS_DOMAIN}',
+                            help="CDMS Flask domain where ingestion endpoint 
resides. Need to include `/insitu` prefix",
+                            metavar="http://localhost:9801/insitu";,
+                            required=True)
+        parser.add_argument(f'--{LambdaFuncEnv.CDMS_BEARER_TOKEN}',
+                            help="plain-text security token that is set in 
CDMS Flask pod during K8s deployment. Check in Dockerfile",
+                            metavar="mock-token",
+                            required=True)
+        parser.add_argument(f'--{LambdaFuncEnv.PARQUET_META_TBL_NAME}',
+                            help="dynamo DB table where parquet file ingestion 
records are stored. Check in Values.yaml",
+                            metavar="cdms_parquet_meta_dev_v1",
+                            required=True)
+        parser.add_argument(f'--{self.BUCKET_NAME_KEY}',
+                            help="name of S3 bucket",
+                            metavar="icoads-bucket",
+                            required=True)
+        parser.add_argument(f'--{self.KEY_PREFIX_KEY}',
+                            help="s3 prefix. It will ingest all files starting 
with this prefix. If all filees need to be ingested, pass empty value. If only 
1 file needs to be ingested, pass the exact file path",
+                            metavar='2021/01/01/samplefile.json.gz',
+                            required=True)
+        parser.add_argument(f'--{LambdaFuncEnv.LOG_LEVEL}',
+                            help="python log level in integer.",
+                            default='10',
+                            metavar='10',
+                            required=False)
+        return parser.parse_args()
+
+    def start(self):
+        options = self.__get_args()
+        logging.basicConfig(level=int(getattr(options, 
LambdaFuncEnv.LOG_LEVEL)),
+                            format="%(asctime)s [%(levelname)s] 
[%(name)s::%(lineno)d] %(message)s")
+
+
+        os.environ[LambdaFuncEnv.LOG_LEVEL] = getattr(options, 
LambdaFuncEnv.LOG_LEVEL)
+        os.environ[LambdaFuncEnv.CDMS_DOMAIN] = getattr(options, 
LambdaFuncEnv.CDMS_DOMAIN)
+        os.environ[LambdaFuncEnv.CDMS_BEARER_TOKEN] = 
base64.standard_b64encode(getattr(options, 
LambdaFuncEnv.CDMS_BEARER_TOKEN).encode()).decode()
+        os.environ[LambdaFuncEnv.PARQUET_META_TBL_NAME] = getattr(options, 
LambdaFuncEnv.PARQUET_META_TBL_NAME)
+        bucket_name = getattr(options, self.BUCKET_NAME_KEY)
+        key_prefix = getattr(options, self.KEY_PREFIX_KEY)
+
+        from 
parquet_flask.cdms_lambda_func.ingest_s3_to_cdms.ingest_s3_to_cdms import 
IngestS3ToCdms
+        from parquet_flask.aws.aws_s3 import AwsS3
+
+        s3 = AwsS3()
+        for key, size in s3.get_child_s3_files(bucket_name, key_prefix,
+                                               lambda x: 
x['Key'].endswith('.json') or x['Key'].endswith('.json.gz')):
+            try:
+                print(f'working on: {key}')
+                IngestS3ToCdms().start(event={'s3_url': 
f's3://{bucket_name}/{key}'})
+            except Exception as e:
+                print(f'error while processing: s3://{bucket_name}/{key}. 
details: {str(e)}')
+        return
+
+
+if __name__ == '__main__':
+    """
+    Sample usage: 
+    
+    python3 -m parquet_cli.ingest_s3 \
+      --LOG_LEVEL 30 \
+      --CDMS_DOMAIN https://doms.jpl.nasa.gov/insitu  \
+      --CDMS_BEARER_TOKEN Mock-Token  \
+      --PARQUET_META_TBL_NAME cdms_parquet_meta_dev_v1  \
+      --BUCKET_NAME cdms-dev-ncar-in-situ-stage  \
+      --KEY_PREFIX cdms_icoads_2017-01-01.json
+  
+  
+    """
+    IngestS3Entry().start()

Reply via email to