aviemzur commented on a change in pull request #59:
URL: https://github.com/apache/incubator-liminal/pull/59#discussion_r672874910



##########
File path: examples/spark-ml-app-demo/k8s/serving.py
##########
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+
+import model_store
+from model_store import ModelStore
+
+_MODEL_STORE = ModelStore(model_store.PRODUCTION)
+_PETAL_WIDTH = 'petal_width'
+
+
+def predict(input_json):
+    print(f'input_json={input_json}')
+    input_dict = json.loads(input_json)
+    model, version = _MODEL_STORE.load_latest_model()
+    result = 
str(model.predict_proba([[float(input_dict[_PETAL_WIDTH])]])[0][1])
+    print(f'result={result}')
+    return result
+
+
+def healthcheck(self):

Review comment:
       Align with changes on master

##########
File path: examples/aws-ml-app-demo/serving.py
##########
@@ -8,28 +25,13 @@
 
 
 def predict(input_json):
-    try:
-        input_dict = json.loads(input_json)
-        model, version = _MODEL_STORE.load_latest_model()
-        result = 
str(model.predict_proba([[float(input_dict[_PETAL_WIDTH])]])[0][1])
-        return json.dumps({"result": result, "version": version})
-
-    except IndexError:
-        return 'Failure: the model is not ready yet'
-
-    except Exception as e:
-        print(e)
-        return 'Failure'
+    print(f'input_json={input_json}')

Review comment:
       Why was this change made?

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -111,26 +210,28 @@ def test_partially_missing_spark_arguments(self):
                     '--class',
                     'org.apache.liminal.MySparkApp',
                     '--conf',
-                    'spark.driver.memory=1g',
-                    '--conf',
                     'spark.driver.maxResultSize=1g',
                     '--conf',
+                    'spark.driver.memory=1g',
+                    '--conf',
                     'spark.yarn.executor.memoryOverhead=500M',
                     'my_app.py',
                     '--query',
                     'select * from dlk_visitor_funnel_dwh_staging.fact_events 
where '
                     "unified_Date_prt >= '{{yesterday_ds}}'",
                     '--output',
-                    'mytable'].sort()
+                    'mytable']
 
         actual = SparkTask(
             'my_spark_task',
             DummyDag('dag-id', 'my_spark_task'),
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
-        self.assertEqual(actual.sort(), expected)
+        print(actual)

Review comment:
       Use logging instead of printing

##########
File path: examples/spark-ml-app-demo/emr/archetype/liminal.yml
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# superliminal for local development
+name: InfraSpark
+type: super
+variables:
+  region: us-east-1
+  core_count: 2
+  emr_version: 'emr-6.2.0'
+  template_url: 'https://s3.amazonaws.com/myorg/emr-template.yml'
+  cluster_name: 'spark-ml-app-demo-liminal'
+  spark_output_path: 'myorg/'

Review comment:
       shouldn't this be an s3 path?

##########
File path: examples/spark-ml-app-demo/k8s/liminal.yml
##########
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+  dataset_path: '{{spark_output_path}}/outputs/'
+  application: data_cleanup.py
+volumes:
+  - volume: gettingstartedvol
+    claim_name: gettingstartedvol-pvc
+    local:
+      path: .
+services:
+  - service:
+    name: my_datascience_server
+    type: python_server
+    description: my ds server
+    image: myorg/mydatascienceapp
+    source: .
+    endpoints:
+      - endpoint: /predict
+        module: serving
+        function: predict
+      - endpoint: /healthcheck
+        module: serving
+        function: healthcheck
+task_defaults:
+  python:
+    mounts:
+      - mount: mymount
+        volume: gettingstartedvol
+        path: /mnt/gettingstartedvol
+pipelines:
+  - pipeline: my_first_pipeline
+    start_date: 1970-01-01
+    timeout_minutes: 45
+    schedule: 0 * 1 * *
+    tasks:
+      - task: data_preprocessing
+        type: spark
+        description: prepare the data for training
+        application_arguments:
+          - 'data/iris.csv'

Review comment:
       Shouldn't this be an s3 path in case of emr executor? Perhaps you can 
add a prefix here defined by a variable in the parent

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -57,26 +154,27 @@ def test_get_runnable_command(self):
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
         self.assertEqual(actual.sort(), expected.sort())
 
+
     def test_missing_spark_arguments(self):
         task_config = {
             'application_source': 'my_app.py',
             'application_arguments': {
-                '--query': "select * from 
dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >= "
-                           "'{{yesterday_ds}}'",
+                '--query': "select * from 
dlk_visitor_funnel_dwh_staging.fact_events where"
+                           " unified_Date_prt >= '{{yesterday_ds}}'",
                 '--output': 'mytable'
             }
         }
 
         expected = ['spark-submit', 'my_app.py',
                     '--query',
-                    "select * from dlk_visitor_funnel_dwh_staging.fact_events 
where unified_Date_prt >="
-                    " '{{yesterday_ds}}'",
+                    "select * from dlk_visitor_funnel_dwh_staging.fact_events 
where "

Review comment:
       Can we make the table/column/database names more generic? Looks like 
this was lifted from a fork.

##########
File path: examples/aws-ml-app-demo/liminal.yml
##########
@@ -20,17 +37,14 @@ services:
       - endpoint: /healthcheck
         module: serving
         function: healthcheck
-      - endpoint: /version

Review comment:
       Why was this endpoint deleted?

##########
File path: examples/spark-ml-app-demo/emr/archetype/liminal.yml
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# superliminal for local development
+name: InfraSpark
+type: super
+variables:
+  region: us-east-1
+  core_count: 2
+  emr_version: 'emr-6.2.0'
+  template_url: 'https://s3.amazonaws.com/myorg/emr-template.yml'
+  cluster_name: 'spark-ml-app-demo-liminal'
+  spark_output_path: 'myorg/'
+executors:
+  - executor: emr-executor
+    cluster_name: '{{cluster_name}}'
+    type: emr
+  - executor: k8s-executor
+    type: kubernetes
+task_defaults:
+  spark:
+    executor: emr-executor
+    master: yarn
+    deploy_mode: cluster
+    application_source: 'myorg/{{application}}'

Review comment:
       Shouldn't we have an s3 path here? We could do one with a configurable 
bucket name

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -15,17 +15,108 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+import os
+import tempfile
 from unittest import TestCase
 
+from liminal.build import liminal_apps_builder
+from liminal.kubernetes import volume_util
 from liminal.runners.airflow import DummyDag
+from liminal.runners.airflow.executors.kubernetes import KubernetesPodExecutor
 from liminal.runners.airflow.tasks.spark import SparkTask
+from tests.util import dag_test_utils
 
 
 class TestSparkTask(TestCase):
     """
     Test Spark Task
     """
 
+    _VOLUME_NAME = 'myvol1'
+
+    def test_spark_on_k8s(self):

Review comment:
       Let's add a similar test to test the spark image builder. Without 
airflow, but with docker

##########
File path: examples/spark-ml-app-demo/k8s/liminal.yml
##########
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+  dataset_path: '{{spark_output_path}}/outputs/'

Review comment:
       Suggest rename `outputs` to `my_first_liminal_spark_app_outputs`

##########
File path: liminal/build/image/spark/spark.py
##########
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from liminal.build.python import BasePythonImageBuilder
+
+
+class SparkImageBuilder(BasePythonImageBuilder):
+    def __init__(self, config, base_path, relative_source_path, tag):
+        super().__init__(config, base_path, relative_source_path, tag)
+
+    @staticmethod
+    def _dockerfile_path():
+        return os.path.join(os.path.dirname(__file__), 'Dockerfile')
+
+    def _additional_files_from_filename_content_pairs(self):
+        with open(self._dockerfile_path()) as original:
+            data = original.read()
+
+        data = self.__add_pip_install(data)
+        data = self._mount_pip_conf(data)
+
+        return [('Dockerfile', data)]
+
+    @staticmethod
+    def __add_pip_install(data):
+
+        if "" in data:

Review comment:
       The condition should be if a requirements file exists.
   But I suppose that since this uses python image builder as a parent it 
always has a `requirements.txt` file? If so then we can always do the pip 
install in the dockerfile and don't need a condition




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to