mpgreg commented on issue #24456: URL: https://github.com/apache/airflow/issues/24456#issuecomment-1788616635
There are two separate PRs here. Actually I haven't made a PR yet so technically only one. But @mik-laj it would be great if you can take a look at my [decorators](https://github.com/mpgreg/airflow/blob/main/airflow/providers/snowflake/decorators/snowpark.py) and [operators](https://github.com/mpgreg/airflow/blob/main/airflow/providers/snowflake/operators/snowpark.py) to see if it makes sense to merge with your PR. At a high level I build from python virtualenv operator. I built this when Snowpark only supported python 3.8 and there were other dependency issues with Airflow. Snowpark now supports 3.8-3.11 but in my experience there will likely always be challenges with major versions. Inheriting from virtualenv operator will simplify dependency management. I also craeted a [SnowparkTable dataclass](https://github.com/mpgreg/airflow/blob/main/airflow/providers/snowflake/utils/snowpark_helpers.py#L27) in order to serialize/deserialize Snowpark Dataframes passed to/from a task. This is needed because the Dataframes are associated with the Snowpark Session which doesn't (or didn't at the time) serialize. So the user can't create a session in the DAG and pass it between tasks. Instead, a new session is created for each task, any SnowparkTable arguments passed to the task are instantiated as Snowpark Dataframes in the session and any Snowpark Dataframes returned from the session are serialized to snowflake as tables or stage objects (user choice). The creation of the snowpark session and the ser/des of SnowparkTable objects is accomplished in a new [virtualenv jinja template](https://github.com/mpgreg/airflow/blob/main/airflow/providers/snowflake/operators/snowpark_virtualenv_script.jinja2) Lastly, and somewhat unrelated, I created a new [custom XCOM Backend for Snowflake](https://github.com/mpgreg/airflow/blob/main/airflow/providers/snowflake/xcom_backends/snowflake.py). Some users (specifically in regulated industries) wanted an option to keep all data in Snowflake so all task in/output is saved to Snowflake objects (tables or stages) and only a uri (snowflake://<ACCOUNT>.<REGION>/<DATABASE>/<SCHEMA>?table=<TABLE>&key=<KEY> or snowflake://<ACCOUNT>.<REGION>/<DATABASE>/<SCHEMA>?stage=<STAGE>&key=<FILE_PATH>) is passed to Airflow XCOM. Small, json-serializable objects are serialized to a single XCOM table in Snowflake with schema similar to Airflow XCOM table. Non-json-serializable objects, or objects bigger than 16mb Snowflake limit, are serialized to a stage. Operators - `SnowparkPythonOperator`: This is the simplest operator which runs as a PythonOperator in the Airflow instance. This requires that the Airflow instance is running a version of python supported by Snowpark and has Snowpark Python package installed. NOTE: Currently Snowpark supports python 3.8, 3.9 and 3.10, 3.11. - `SnowparkVirtualenvOperator`: This operator creates a python virtualenv to run the python callable in a subprocess. Users can specify python package requirements (ie. snowflake-snowpark-python). - `SnowparkExternalPythonOperator`: This operator runs the Snowpark python callable in a pre-existing virtualenv. It is assumed that Snowpark is already installed in that environment. Using the [Astronomer buildkit](https://github.com/astronomer/astro-provider-venv) will simplify building this environment. - `SnowparkPythonUDFOperator`: (TBD) - `SnowparkPythonSPROCOperator`: (TBD) Decorators - `snowpark_python_task`: Taskflow decorator for SnowparkPythonOperator (i.e @task.snowpark_python()) - `snowpark_virtualenv_task`: Taskflow decorator for SnowparkVirtualenvOperator (i.e @task.snowpark_virtualenv()) - `snowpark_ext_python_task`: Taskflow decorator for SnowparkExternalPythonOperator (i.e @task.snowpark_ext_python()) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
