This is an automated email from the ASF dual-hosted git repository.

nchung pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-sdap-in-situ-data-services.git


The following commit(s) were added to refs/heads/master by this push:
     new e040f94  Implement parquet partition by platform (#15)
e040f94 is described below

commit e040f949ff5adf487798a0f392eb8e519318be4e
Author: Jason Min-Liang Kang <[email protected]>
AuthorDate: Thu May 11 15:43:11 2023 -0700

    Implement parquet partition by platform (#15)
    
    * Implement parquet partition by platform
    
    * Update changelog
    
    * Update changelog with SDAP ticket number
---
 CHANGELOG.md                                       |  9 ++++
 .../parquet.spark.helm/templates/deployment.yaml   |  4 +-
 k8s_spark/parquet.spark.helm/values.yaml           | 15 ++++---
 parquet_flask/io_logic/cdms_constants.py           |  2 +-
 parquet_flask/io_logic/ingest_new_file.py          | 51 +++++++++++-----------
 .../parquet_flask/io_logic/test_ingest_new_file.py | 23 ++++++----
 6 files changed, 62 insertions(+), 42 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 90ce8a3..18fadf2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,15 @@ All notable changes to this project will be documented in this 
file.
 The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic 
Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [Unreleased]
+### Added
+- SDAP-463: Added capability to further partition parquet objects/files by 
platform
+### Changed
+### Deprecated
+### Removed
+### Fixed
+### Security
+
 ## [0.3.0] - 2022-07-13
 ### Added
 - CDMS-xxx: Added `CLI` script to ingest S3 data into the Parquet system
diff --git a/k8s_spark/parquet.spark.helm/templates/deployment.yaml 
b/k8s_spark/parquet.spark.helm/templates/deployment.yaml
index 8571ab9..8566caa 100644
--- a/k8s_spark/parquet.spark.helm/templates/deployment.yaml
+++ b/k8s_spark/parquet.spark.helm/templates/deployment.yaml
@@ -88,8 +88,8 @@ spec:
               value: {{ .Values.flask_env.log_level }}
             - name: spark_config_dict
               value: {{ .Values.flask_env.spark_config_dict | toJson | quote  
}}
-            - name: geospatial_interval_by_project
-              value: {{ .Values.flask_env.geospatial_interval_by_project | 
toJson | quote }}
+            - name: geospatial_interval_by_platform
+              value: {{ .Values.flask_env.geospatial_interval_by_platform | 
toJson | quote }}
           {{- if .Values.extraEnvs }}
             {{- toYaml .Values.extraEnvs | nindent 12 }}
           {{- end }}
diff --git a/k8s_spark/parquet.spark.helm/values.yaml 
b/k8s_spark/parquet.spark.helm/values.yaml
index e7098fc..72e5666 100644
--- a/k8s_spark/parquet.spark.helm/values.yaml
+++ b/k8s_spark/parquet.spark.helm/values.yaml
@@ -48,11 +48,16 @@ flask_env:
     "spark.memory.offHeap.size":"512M"
   }
 
-  # The config, geo_spatial_interval_by_project, allows specifying geo-spartial
-  # partition interval/limit at project level. If not specified, 30 is the 
-  # default value.
-  geospatial_interval_by_project: {
-    "ICOADS Release 3.0": "100"
+  # The config, geo_spatial_interval_by_platform, allows specifying 
geo-spartial
+  # partition interval/limit at platform level. The dict is organized like this
+  # {<project>: {<platform_code_01: <interval>, <platform_code_02>: 
<interval>}}.
+  # If not specified, 30 (which is the value defined in ingest_new_file.py) is
+  # the default value.
+  geospatial_interval_by_platform: {
+    "ICOADS Release 3.0": {
+      "23": "100",
+      "31": "50"
+    }
   }
 
 # AWS EKS IRSA is favored over AWS IAM User credentials when possible
diff --git a/parquet_flask/io_logic/cdms_constants.py 
b/parquet_flask/io_logic/cdms_constants.py
index 28b69d8..ad35d84 100644
--- a/parquet_flask/io_logic/cdms_constants.py
+++ b/parquet_flask/io_logic/cdms_constants.py
@@ -60,4 +60,4 @@ class CDMSConstants:
     max_lon = 'max_lon'
     min_lon = 'min_lon'
 
-    geospatial_interval_by_project = 'geospatial_interval_by_project'
\ No newline at end of file
+    geospatial_interval_by_platform = 'geospatial_interval_by_platform'
\ No newline at end of file
diff --git a/parquet_flask/io_logic/ingest_new_file.py 
b/parquet_flask/io_logic/ingest_new_file.py
index e1239cc..339427b 100644
--- a/parquet_flask/io_logic/ingest_new_file.py
+++ b/parquet_flask/io_logic/ingest_new_file.py
@@ -37,28 +37,23 @@ LOGGER = logging.getLogger(__name__)
 GEOSPATIAL_INTERVAL = 30
 
 
-def get_geospatial_interval(project: str) -> int:
+def get_geospatial_interval(project: str) -> dict:
     """
-    Get geospatial interval from environment variable. If not found, return 
default value.
+    Get geospatial interval dict object from environment variable. If not 
found, return empty dict.
 
     :param project: project name
-    :return: geospatial interval
+    :return: geospatial interval dict
     """
-    interval = GEOSPATIAL_INTERVAL
-    geo_spatial_interval_by_project = 
environ.get(CDMSConstants.geospatial_interval_by_project)
-    if not geo_spatial_interval_by_project:
-        return interval
-    geo_spatial_interval_by_project_dict = 
json.loads(geo_spatial_interval_by_project)
-    if not isinstance(geo_spatial_interval_by_project_dict, dict):
-        return interval
-    if project not in geo_spatial_interval_by_project_dict:
-        return interval
-    try:
-        return int(geo_spatial_interval_by_project_dict[project])
-    except:
-        LOGGER.exception(
-            f'geo_spatial_interval_by_project_dict for {project} is not int: 
{geo_spatial_interval_by_project_dict[project]}')
-        return interval
+    interval_dict = {}
+    geo_spatial_interval_by_platform = 
environ.get(CDMSConstants.geospatial_interval_by_platform)
+    if not geo_spatial_interval_by_platform:
+        return interval_dict
+    geo_spatial_interval_by_platform_dict = 
json.loads(geo_spatial_interval_by_platform)
+    if not isinstance(geo_spatial_interval_by_platform_dict, dict):
+        return interval_dict
+    if project not in geo_spatial_interval_by_platform_dict or not 
isinstance(geo_spatial_interval_by_platform_dict[project], dict):
+        return interval_dict
+    return geo_spatial_interval_by_platform_dict[project]
 
 
 class IngestNewJsonFile:
@@ -97,23 +92,29 @@ class IngestNewJsonFile:
             .withColumn(CDMSConstants.job_id_col, lit(job_id))\
             .withColumn(CDMSConstants.provider_col, lit(provider))\
             .withColumn(CDMSConstants.project_col, lit(project))
-        geospatial_interval = get_geospatial_interval(project)
+        geospatial_interval_dict = get_geospatial_interval(project)
         try:
             df: DataFrame = df.withColumn(
                 CDMSConstants.geo_spatial_interval_col, 
                 pyspark_functions.udf(
-                    lambda latitude, longitude: f'{int(latitude - 
divmod(latitude, geospatial_interval)[1])}_{int(longitude - divmod(longitude, 
geospatial_interval)[1])}',
+                    lambda platform_code, latitude, longitude: f'{int(latitude 
- divmod(latitude, int(geospatial_interval_dict.get(platform_code, 
GEOSPATIAL_INTERVAL)))[1])}_{int(longitude - divmod(longitude, 
int(geospatial_interval_dict.get(platform_code, GEOSPATIAL_INTERVAL)))[1])}',
                     StringType())(
+                        df[CDMSConstants.platform_code_col],
                         df[CDMSConstants.lat_col],
                         df[CDMSConstants.lon_col]))
             df: DataFrame = df.repartition(1)  # combine to 1 data frame to 
increase size
             # .withColumn('ingested_date', 
lit(TimeUtils.get_current_time_str()))
             LOGGER.debug(f'create writer')
-            all_partitions = [CDMSConstants.provider_col, 
CDMSConstants.project_col, CDMSConstants.platform_code_col,
-                              CDMSConstants.geo_spatial_interval_col,
-                              CDMSConstants.year_col, CDMSConstants.month_col,
-                              CDMSConstants.job_id_col]
-            df = df.repartition(1)  # XXX: is this line repeated?
+            all_partitions = [
+                CDMSConstants.provider_col, 
+                CDMSConstants.project_col, 
+                CDMSConstants.platform_code_col,
+                CDMSConstants.geo_spatial_interval_col,
+                CDMSConstants.year_col,
+                CDMSConstants.month_col,
+                CDMSConstants.job_id_col
+            ]
+            # df = df.repartition(1)  # XXX: is this line repeated?
             df_writer = df.write
             LOGGER.debug(f'create partitions')
             df_writer = df_writer.partitionBy(all_partitions)
diff --git a/tests/parquet_flask/io_logic/test_ingest_new_file.py 
b/tests/parquet_flask/io_logic/test_ingest_new_file.py
index 7aa5bd5..eedf72b 100644
--- a/tests/parquet_flask/io_logic/test_ingest_new_file.py
+++ b/tests/parquet_flask/io_logic/test_ingest_new_file.py
@@ -22,14 +22,19 @@ from parquet_flask.io_logic.ingest_new_file import 
get_geospatial_interval
 
 class TestGeneralUtilsV3(TestCase):
     def test_get_geospatial_interval(self):
-        os.environ['geospatial_interval_by_project'] = json.dumps({
-            "ICOADS Release 3.0": 100,
-            "SAMOS": "50",
-            "t1": "7.5",
-            "SPURS": "75"
+        icoads_interval_by_platform = {
+            "0": "50",
+            "16": "7.5" 
+        }
+        samos_interval_by_platform = {
+            "0": "50",
+            "17": "7.5"
+        }
+        os.environ['geospatial_interval_by_platform'] = json.dumps({
+            "ICOADS Release 3.0": icoads_interval_by_platform,
+            "SAMOS": samos_interval_by_platform
         })
-        self.assertEqual(get_geospatial_interval('SAMOS'), 50, 'wrong for 
SAMOS')
-        self.assertEqual(get_geospatial_interval('SPURS'), 75, 'wrong for 
SPURS')
-        self.assertEqual(get_geospatial_interval('ICOADS Release 3.0'), 100, 
'wrong for ICOADS Release 3.0')
-        self.assertEqual(get_geospatial_interval('t1'), 30, 'wrong for t1')
+        self.assertDictEqual(get_geospatial_interval('SAMOS'), 
samos_interval_by_platform, 'wrong for SAMOS')
+        self.assertDictEqual(get_geospatial_interval('ICOADS Release 3.0'), 
icoads_interval_by_platform, 'wrong for ICOADS Release 3.0')
+        self.assertEqual(get_geospatial_interval('t1'), {}, 'wrong for t1')
         return

Reply via email to