This is an automated email from the ASF dual-hosted git repository.
nchung pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-sdap-in-situ-data-services.git
The following commit(s) were added to refs/heads/master by this push:
new 4e93f75 feat: CLI script to ingest S3 files (#5)
4e93f75 is described below
commit 4e93f7521a9d7d58b351b5f59a8be18b0a646534
Author: wphyojpl <[email protected]>
AuthorDate: Wed Jun 22 13:03:54 2022 -0700
feat: CLI script to ingest S3 files (#5)
* feat: add CLI script
* chore: update readme
* chore: add changelog
---
CHANGELOG.md | 14 ++++++
README.md | 32 ++++++++++++-
parquet_cli/__init__.py | 0
parquet_cli/ingest_s3/__init__.py | 0
parquet_cli/ingest_s3/__main__.py | 94 +++++++++++++++++++++++++++++++++++++++
5 files changed, 139 insertions(+), 1 deletion(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..e44938a
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,14 @@
+# Changelog
+All notable changes to this project will be documented in this file.
+
+The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/),
+and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0.html).
+
+## [Unreleased]
+### Added
+- CDMS-xxx: Added `CLI` script to ingest S3 data into the Parquet system
+### Changed
+### Deprecated
+### Removed
+### Fixed
+### Security
diff --git a/README.md b/README.md
index b573410..d08c868 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,35 @@
-# parquet_test_1
+# Insitu Data in Parquet format stored in S3
+### How to ingest a insitu json file to Parquet
+- Assumption: K8s is successfully deployed
+- Download this repo
+- (optional) create different python3.6 environment
+- install dependencies
+
+ python3 setup.py install
+- setup AWS tokens
+
+ export AWS_ACCESS_KEY_ID=xxx
+ export AWS_SECRET_ACCESS_KEY=xxx
+ export AWS_SESSION_TOKEN=really.long.token
+ export AWS_REGION=us-west-2
+ - alternatively the `default` profile under `~/.aws/credentials` can be
setup as well
+- setup current directory to `PYTHONPATH`
+
+ PYTHONPATH="${PYTHONPATH}:/absolute/path/to/current/dir/"
+- run the script:
+
+ python3 -m parquet_cli.ingest_s3 --help
+ - sample script:
+
+ python3 -m parquet_cli.ingest_s3 \
+ --LOG_LEVEL 30 \
+ --CDMS_DOMAIN https://doms.jpl.nasa.gov/insitu \
+ --CDMS_BEARER_TOKEN Mock-CDMS-Flask-Token \
+ --PARQUET_META_TBL_NAME cdms_parquet_meta_dev_v1 \
+ --BUCKET_NAME cdms-dev-ncar-in-situ-stage \
+ --KEY_PREFIX cdms_icoads_2017-01-01.json
+
### Ref:
- how to replace parquet file partially
```
diff --git a/parquet_cli/__init__.py b/parquet_cli/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/parquet_cli/ingest_s3/__init__.py
b/parquet_cli/ingest_s3/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/parquet_cli/ingest_s3/__main__.py
b/parquet_cli/ingest_s3/__main__.py
new file mode 100644
index 0000000..c4a2948
--- /dev/null
+++ b/parquet_cli/ingest_s3/__main__.py
@@ -0,0 +1,94 @@
+import argparse
+import base64
+import logging
+import os
+
+os.environ['master_spark_url'] = ''
+os.environ['spark_app_name'] = ''
+os.environ['parquet_file_name'] = ''
+os.environ['in_situ_schema'] = ''
+os.environ['authentication_type'] = ''
+os.environ['authentication_key'] = ''
+os.environ['parquet_metadata_tbl'] = ''
+
+from parquet_flask.cdms_lambda_func.lambda_func_env import LambdaFuncEnv
+
+
+class IngestS3Entry:
+ BUCKET_NAME_KEY = 'BUCKET_NAME'
+ KEY_PREFIX_KEY = 'KEY_PREFIX'
+
+ def __init__(self):
+ self.__a = ''
+
+ def __get_args(self) -> argparse.Namespace:
+ parser = argparse.ArgumentParser(description="Ingesting 1 or more S3
files into Parquet. Note that AWS environment variables should be set before
running this")
+ parser.add_argument(f'--{LambdaFuncEnv.CDMS_DOMAIN}',
+ help="CDMS Flask domain where ingestion endpoint
resides. Need to include `/insitu` prefix",
+ metavar="http://localhost:9801/insitu",
+ required=True)
+ parser.add_argument(f'--{LambdaFuncEnv.CDMS_BEARER_TOKEN}',
+ help="plain-text security token that is set in
CDMS Flask pod during K8s deployment. Check in Dockerfile",
+ metavar="mock-token",
+ required=True)
+ parser.add_argument(f'--{LambdaFuncEnv.PARQUET_META_TBL_NAME}',
+ help="dynamo DB table where parquet file ingestion
records are stored. Check in Values.yaml",
+ metavar="cdms_parquet_meta_dev_v1",
+ required=True)
+ parser.add_argument(f'--{self.BUCKET_NAME_KEY}',
+ help="name of S3 bucket",
+ metavar="icoads-bucket",
+ required=True)
+ parser.add_argument(f'--{self.KEY_PREFIX_KEY}',
+ help="s3 prefix. It will ingest all files starting
with this prefix. If all filees need to be ingested, pass empty value. If only
1 file needs to be ingested, pass the exact file path",
+ metavar='2021/01/01/samplefile.json.gz',
+ required=True)
+ parser.add_argument(f'--{LambdaFuncEnv.LOG_LEVEL}',
+ help="python log level in integer.",
+ default='10',
+ metavar='10',
+ required=False)
+ return parser.parse_args()
+
+ def start(self):
+ options = self.__get_args()
+ logging.basicConfig(level=int(getattr(options,
LambdaFuncEnv.LOG_LEVEL)),
+ format="%(asctime)s [%(levelname)s]
[%(name)s::%(lineno)d] %(message)s")
+
+
+ os.environ[LambdaFuncEnv.LOG_LEVEL] = getattr(options,
LambdaFuncEnv.LOG_LEVEL)
+ os.environ[LambdaFuncEnv.CDMS_DOMAIN] = getattr(options,
LambdaFuncEnv.CDMS_DOMAIN)
+ os.environ[LambdaFuncEnv.CDMS_BEARER_TOKEN] =
base64.standard_b64encode(getattr(options,
LambdaFuncEnv.CDMS_BEARER_TOKEN).encode()).decode()
+ os.environ[LambdaFuncEnv.PARQUET_META_TBL_NAME] = getattr(options,
LambdaFuncEnv.PARQUET_META_TBL_NAME)
+ bucket_name = getattr(options, self.BUCKET_NAME_KEY)
+ key_prefix = getattr(options, self.KEY_PREFIX_KEY)
+
+ from
parquet_flask.cdms_lambda_func.ingest_s3_to_cdms.ingest_s3_to_cdms import
IngestS3ToCdms
+ from parquet_flask.aws.aws_s3 import AwsS3
+
+ s3 = AwsS3()
+ for key, size in s3.get_child_s3_files(bucket_name, key_prefix,
+ lambda x:
x['Key'].endswith('.json') or x['Key'].endswith('.json.gz')):
+ try:
+ print(f'working on: {key}')
+ IngestS3ToCdms().start(event={'s3_url':
f's3://{bucket_name}/{key}'})
+ except Exception as e:
+ print(f'error while processing: s3://{bucket_name}/{key}.
details: {str(e)}')
+ return
+
+
+if __name__ == '__main__':
+ """
+ Sample usage:
+
+ python3 -m parquet_cli.ingest_s3 \
+ --LOG_LEVEL 30 \
+ --CDMS_DOMAIN https://doms.jpl.nasa.gov/insitu \
+ --CDMS_BEARER_TOKEN Mock-Token \
+ --PARQUET_META_TBL_NAME cdms_parquet_meta_dev_v1 \
+ --BUCKET_NAME cdms-dev-ncar-in-situ-stage \
+ --KEY_PREFIX cdms_icoads_2017-01-01.json
+
+
+ """
+ IngestS3Entry().start()