mpgreg commented on issue #24456:
URL: https://github.com/apache/airflow/issues/24456#issuecomment-1788616635

   There are two separate PRs here.  Actually I haven't made a PR yet so 
technically only one.  But @mik-laj it would be great if you can take a look at 
my  
[decorators](https://github.com/mpgreg/airflow/blob/main/airflow/providers/snowflake/decorators/snowpark.py)
 and 
[operators](https://github.com/mpgreg/airflow/blob/main/airflow/providers/snowflake/operators/snowpark.py)
 to see if it makes sense to merge with your PR.
   
   At a high level I build from python virtualenv operator.  I built this when 
Snowpark only supported python 3.8 and there were other dependency issues with 
Airflow.  Snowpark now supports 3.8-3.11 but in my experience there will likely 
always be challenges with major versions.  Inheriting from virtualenv operator 
will simplify dependency management.  
   
   I also craeted a [SnowparkTable 
dataclass](https://github.com/mpgreg/airflow/blob/main/airflow/providers/snowflake/utils/snowpark_helpers.py#L27)
 in order to serialize/deserialize Snowpark Dataframes passed to/from a task.  
This is needed because the Dataframes are associated with the Snowpark Session 
which doesn't (or didn't at the time) serialize.  So the user can't create a 
session in the DAG and pass it between tasks.  Instead, a new session is 
created for each task, any SnowparkTable arguments passed to the task are 
instantiated as Snowpark Dataframes in the session and any Snowpark Dataframes 
returned from the session are serialized to snowflake as tables or stage 
objects (user choice).  
   
   The creation of the snowpark session and the ser/des of SnowparkTable 
objects is accomplished in a new [virtualenv jinja 
template](https://github.com/mpgreg/airflow/blob/main/airflow/providers/snowflake/operators/snowpark_virtualenv_script.jinja2)
 
   
   Lastly, and somewhat unrelated, I created a new [custom XCOM Backend for 
Snowflake](https://github.com/mpgreg/airflow/blob/main/airflow/providers/snowflake/xcom_backends/snowflake.py).
  Some users (specifically in regulated industries) wanted an option to keep 
all data in Snowflake so all task in/output is saved to Snowflake objects 
(tables or stages) and only a uri 
(snowflake://<ACCOUNT>.<REGION>/<DATABASE>/<SCHEMA>?table=<TABLE>&key=<KEY> or 
snowflake://<ACCOUNT>.<REGION>/<DATABASE>/<SCHEMA>?stage=<STAGE>&key=<FILE_PATH>)
 is passed to Airflow XCOM.  Small, json-serializable objects are serialized to 
a single XCOM table in Snowflake with schema similar to Airflow XCOM table.  
Non-json-serializable objects, or objects bigger than 16mb Snowflake limit, are 
serialized to a stage.
   
   Operators
     - `SnowparkPythonOperator`: This is the simplest operator which runs as a 
PythonOperator in the Airflow instance.  This requires that the Airflow 
instance is running a version of python supported by Snowpark and has Snowpark 
Python package installed. NOTE: Currently Snowpark supports python 3.8, 3.9 and 
3.10, 3.11.
     - `SnowparkVirtualenvOperator`: This operator creates a python virtualenv 
to run the python callable in a subprocess.  Users can specify python package 
requirements (ie. snowflake-snowpark-python).  
     - `SnowparkExternalPythonOperator`: This operator runs the Snowpark python 
callable in a pre-existing virtualenv. It is assumed that Snowpark is already 
installed in that environment. Using the [Astronomer 
buildkit](https://github.com/astronomer/astro-provider-venv) will simplify 
building this environment.
     - `SnowparkPythonUDFOperator`: (TBD) 
     - `SnowparkPythonSPROCOperator`: (TBD)
    
   Decorators
     - `snowpark_python_task`: Taskflow decorator for SnowparkPythonOperator 
(i.e @task.snowpark_python())
     - `snowpark_virtualenv_task`: Taskflow decorator for 
SnowparkVirtualenvOperator (i.e @task.snowpark_virtualenv())
     - `snowpark_ext_python_task`: Taskflow decorator for 
SnowparkExternalPythonOperator (i.e @task.snowpark_ext_python())


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to