This is an automated email from the ASF dual-hosted git repository.
nchung pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-sdap-in-situ-data-services.git
The following commit(s) were added to refs/heads/master by this push:
new e040f94 Implement parquet partition by platform (#15)
e040f94 is described below
commit e040f949ff5adf487798a0f392eb8e519318be4e
Author: Jason Min-Liang Kang <[email protected]>
AuthorDate: Thu May 11 15:43:11 2023 -0700
Implement parquet partition by platform (#15)
* Implement parquet partition by platform
* Update changelog
* Update changelog with SDAP ticket number
---
CHANGELOG.md | 9 ++++
.../parquet.spark.helm/templates/deployment.yaml | 4 +-
k8s_spark/parquet.spark.helm/values.yaml | 15 ++++---
parquet_flask/io_logic/cdms_constants.py | 2 +-
parquet_flask/io_logic/ingest_new_file.py | 51 +++++++++++-----------
.../parquet_flask/io_logic/test_ingest_new_file.py | 23 ++++++----
6 files changed, 62 insertions(+), 42 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 90ce8a3..18fadf2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,15 @@ All notable changes to this project will be documented in this
file.
The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0.html).
+## [Unreleased]
+### Added
+- SDAP-463: Added capability to further partition parquet objects/files by
platform
+### Changed
+### Deprecated
+### Removed
+### Fixed
+### Security
+
## [0.3.0] - 2022-07-13
### Added
- CDMS-xxx: Added `CLI` script to ingest S3 data into the Parquet system
diff --git a/k8s_spark/parquet.spark.helm/templates/deployment.yaml
b/k8s_spark/parquet.spark.helm/templates/deployment.yaml
index 8571ab9..8566caa 100644
--- a/k8s_spark/parquet.spark.helm/templates/deployment.yaml
+++ b/k8s_spark/parquet.spark.helm/templates/deployment.yaml
@@ -88,8 +88,8 @@ spec:
value: {{ .Values.flask_env.log_level }}
- name: spark_config_dict
value: {{ .Values.flask_env.spark_config_dict | toJson | quote
}}
- - name: geospatial_interval_by_project
- value: {{ .Values.flask_env.geospatial_interval_by_project |
toJson | quote }}
+ - name: geospatial_interval_by_platform
+ value: {{ .Values.flask_env.geospatial_interval_by_platform |
toJson | quote }}
{{- if .Values.extraEnvs }}
{{- toYaml .Values.extraEnvs | nindent 12 }}
{{- end }}
diff --git a/k8s_spark/parquet.spark.helm/values.yaml
b/k8s_spark/parquet.spark.helm/values.yaml
index e7098fc..72e5666 100644
--- a/k8s_spark/parquet.spark.helm/values.yaml
+++ b/k8s_spark/parquet.spark.helm/values.yaml
@@ -48,11 +48,16 @@ flask_env:
"spark.memory.offHeap.size":"512M"
}
- # The config, geo_spatial_interval_by_project, allows specifying geo-spartial
- # partition interval/limit at project level. If not specified, 30 is the
- # default value.
- geospatial_interval_by_project: {
- "ICOADS Release 3.0": "100"
+ # The config, geo_spatial_interval_by_platform, allows specifying
geo-spartial
+ # partition interval/limit at platform level. The dict is organized like this
+ # {<project>: {<platform_code_01: <interval>, <platform_code_02>:
<interval>}}.
+ # If not specified, 30 (which is the value defined in ingest_new_file.py) is
+ # the default value.
+ geospatial_interval_by_platform: {
+ "ICOADS Release 3.0": {
+ "23": "100",
+ "31": "50"
+ }
}
# AWS EKS IRSA is favored over AWS IAM User credentials when possible
diff --git a/parquet_flask/io_logic/cdms_constants.py
b/parquet_flask/io_logic/cdms_constants.py
index 28b69d8..ad35d84 100644
--- a/parquet_flask/io_logic/cdms_constants.py
+++ b/parquet_flask/io_logic/cdms_constants.py
@@ -60,4 +60,4 @@ class CDMSConstants:
max_lon = 'max_lon'
min_lon = 'min_lon'
- geospatial_interval_by_project = 'geospatial_interval_by_project'
\ No newline at end of file
+ geospatial_interval_by_platform = 'geospatial_interval_by_platform'
\ No newline at end of file
diff --git a/parquet_flask/io_logic/ingest_new_file.py
b/parquet_flask/io_logic/ingest_new_file.py
index e1239cc..339427b 100644
--- a/parquet_flask/io_logic/ingest_new_file.py
+++ b/parquet_flask/io_logic/ingest_new_file.py
@@ -37,28 +37,23 @@ LOGGER = logging.getLogger(__name__)
GEOSPATIAL_INTERVAL = 30
-def get_geospatial_interval(project: str) -> int:
+def get_geospatial_interval(project: str) -> dict:
"""
- Get geospatial interval from environment variable. If not found, return
default value.
+ Get geospatial interval dict object from environment variable. If not
found, return empty dict.
:param project: project name
- :return: geospatial interval
+ :return: geospatial interval dict
"""
- interval = GEOSPATIAL_INTERVAL
- geo_spatial_interval_by_project =
environ.get(CDMSConstants.geospatial_interval_by_project)
- if not geo_spatial_interval_by_project:
- return interval
- geo_spatial_interval_by_project_dict =
json.loads(geo_spatial_interval_by_project)
- if not isinstance(geo_spatial_interval_by_project_dict, dict):
- return interval
- if project not in geo_spatial_interval_by_project_dict:
- return interval
- try:
- return int(geo_spatial_interval_by_project_dict[project])
- except:
- LOGGER.exception(
- f'geo_spatial_interval_by_project_dict for {project} is not int:
{geo_spatial_interval_by_project_dict[project]}')
- return interval
+ interval_dict = {}
+ geo_spatial_interval_by_platform =
environ.get(CDMSConstants.geospatial_interval_by_platform)
+ if not geo_spatial_interval_by_platform:
+ return interval_dict
+ geo_spatial_interval_by_platform_dict =
json.loads(geo_spatial_interval_by_platform)
+ if not isinstance(geo_spatial_interval_by_platform_dict, dict):
+ return interval_dict
+ if project not in geo_spatial_interval_by_platform_dict or not
isinstance(geo_spatial_interval_by_platform_dict[project], dict):
+ return interval_dict
+ return geo_spatial_interval_by_platform_dict[project]
class IngestNewJsonFile:
@@ -97,23 +92,29 @@ class IngestNewJsonFile:
.withColumn(CDMSConstants.job_id_col, lit(job_id))\
.withColumn(CDMSConstants.provider_col, lit(provider))\
.withColumn(CDMSConstants.project_col, lit(project))
- geospatial_interval = get_geospatial_interval(project)
+ geospatial_interval_dict = get_geospatial_interval(project)
try:
df: DataFrame = df.withColumn(
CDMSConstants.geo_spatial_interval_col,
pyspark_functions.udf(
- lambda latitude, longitude: f'{int(latitude -
divmod(latitude, geospatial_interval)[1])}_{int(longitude - divmod(longitude,
geospatial_interval)[1])}',
+ lambda platform_code, latitude, longitude: f'{int(latitude
- divmod(latitude, int(geospatial_interval_dict.get(platform_code,
GEOSPATIAL_INTERVAL)))[1])}_{int(longitude - divmod(longitude,
int(geospatial_interval_dict.get(platform_code, GEOSPATIAL_INTERVAL)))[1])}',
StringType())(
+ df[CDMSConstants.platform_code_col],
df[CDMSConstants.lat_col],
df[CDMSConstants.lon_col]))
df: DataFrame = df.repartition(1) # combine to 1 data frame to
increase size
# .withColumn('ingested_date',
lit(TimeUtils.get_current_time_str()))
LOGGER.debug(f'create writer')
- all_partitions = [CDMSConstants.provider_col,
CDMSConstants.project_col, CDMSConstants.platform_code_col,
- CDMSConstants.geo_spatial_interval_col,
- CDMSConstants.year_col, CDMSConstants.month_col,
- CDMSConstants.job_id_col]
- df = df.repartition(1) # XXX: is this line repeated?
+ all_partitions = [
+ CDMSConstants.provider_col,
+ CDMSConstants.project_col,
+ CDMSConstants.platform_code_col,
+ CDMSConstants.geo_spatial_interval_col,
+ CDMSConstants.year_col,
+ CDMSConstants.month_col,
+ CDMSConstants.job_id_col
+ ]
+ # df = df.repartition(1) # XXX: is this line repeated?
df_writer = df.write
LOGGER.debug(f'create partitions')
df_writer = df_writer.partitionBy(all_partitions)
diff --git a/tests/parquet_flask/io_logic/test_ingest_new_file.py
b/tests/parquet_flask/io_logic/test_ingest_new_file.py
index 7aa5bd5..eedf72b 100644
--- a/tests/parquet_flask/io_logic/test_ingest_new_file.py
+++ b/tests/parquet_flask/io_logic/test_ingest_new_file.py
@@ -22,14 +22,19 @@ from parquet_flask.io_logic.ingest_new_file import
get_geospatial_interval
class TestGeneralUtilsV3(TestCase):
def test_get_geospatial_interval(self):
- os.environ['geospatial_interval_by_project'] = json.dumps({
- "ICOADS Release 3.0": 100,
- "SAMOS": "50",
- "t1": "7.5",
- "SPURS": "75"
+ icoads_interval_by_platform = {
+ "0": "50",
+ "16": "7.5"
+ }
+ samos_interval_by_platform = {
+ "0": "50",
+ "17": "7.5"
+ }
+ os.environ['geospatial_interval_by_platform'] = json.dumps({
+ "ICOADS Release 3.0": icoads_interval_by_platform,
+ "SAMOS": samos_interval_by_platform
})
- self.assertEqual(get_geospatial_interval('SAMOS'), 50, 'wrong for
SAMOS')
- self.assertEqual(get_geospatial_interval('SPURS'), 75, 'wrong for
SPURS')
- self.assertEqual(get_geospatial_interval('ICOADS Release 3.0'), 100,
'wrong for ICOADS Release 3.0')
- self.assertEqual(get_geospatial_interval('t1'), 30, 'wrong for t1')
+ self.assertDictEqual(get_geospatial_interval('SAMOS'),
samos_interval_by_platform, 'wrong for SAMOS')
+ self.assertDictEqual(get_geospatial_interval('ICOADS Release 3.0'),
icoads_interval_by_platform, 'wrong for ICOADS Release 3.0')
+ self.assertEqual(get_geospatial_interval('t1'), {}, 'wrong for t1')
return