This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a2bfc0e62d AIP-47 - Migrate spark DAGs to new design #22439 (#24210)
a2bfc0e62d is described below
commit a2bfc0e62dddb8b4e17d833bdf22d282cb265935
Author: chethanuk-plutoflume <[email protected]>
AuthorDate: Sun Jun 5 10:29:37 2022 +0100
AIP-47 - Migrate spark DAGs to new design #22439 (#24210)
---
.pre-commit-config.yaml | 1 +
airflow/providers/apache/spark/example_dags/__init__.py | 17 -----------------
docs/apache-airflow-providers-apache-spark/index.rst | 2 +-
.../apache-airflow-providers-apache-spark/operators.rst | 6 +++---
.../system/providers/apache/spark}/example_spark_dag.py | 16 ++++++++++++++--
5 files changed, 19 insertions(+), 23 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 041eca2891..46497eed89 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -449,6 +449,7 @@ repos:
^airflow/www/static/|
^airflow/providers/|
^tests/providers/apache/cassandra/hooks/test_cassandra.py$|
+ ^tests/system/providers/apache/spark/example_spark_dag.py$|
^docs/apache-airflow-providers-apache-cassandra/connections/cassandra.rst$|
^docs/apache-airflow-providers-apache-hive/commits.rst$|
^airflow/api_connexion/openapi/v1.yaml$|
diff --git a/airflow/providers/apache/spark/example_dags/__init__.py
b/airflow/providers/apache/spark/example_dags/__init__.py
deleted file mode 100644
index 217e5db960..0000000000
--- a/airflow/providers/apache/spark/example_dags/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/docs/apache-airflow-providers-apache-spark/index.rst
b/docs/apache-airflow-providers-apache-spark/index.rst
index 03f1449cb3..25c1b694d3 100644
--- a/docs/apache-airflow-providers-apache-spark/index.rst
+++ b/docs/apache-airflow-providers-apache-spark/index.rst
@@ -38,7 +38,7 @@ Content
:maxdepth: 1
:caption: Resources
- Example DAGs
<https://github.com/apache/airflow/tree/main/airflow/providers/apache/spark/example_dags>
+ Example DAGs
<https://github.com/apache/airflow/tree/main/tests/system/providers/apache/spark>
PyPI Repository
<https://pypi.org/project/apache-airflow-providers-apache-spark/>
Installing from sources <installing-providers-from-sources>
diff --git a/docs/apache-airflow-providers-apache-spark/operators.rst
b/docs/apache-airflow-providers-apache-spark/operators.rst
index 3fb1353f73..25eb52d52b 100644
--- a/docs/apache-airflow-providers-apache-spark/operators.rst
+++ b/docs/apache-airflow-providers-apache-spark/operators.rst
@@ -40,7 +40,7 @@ Using the operator
Using ``cmd_type`` parameter, is possible to transfer data from Spark to a
database (``spark_to_jdbc``) or from a database to Spark (``jdbc_to_spark``),
which will write the table using the Spark command ``saveAsTable``.
-.. exampleinclude::
/../../airflow/providers/apache/spark/example_dags/example_spark_dag.py
+.. exampleinclude::
/../../tests/system/providers/apache/spark/example_spark_dag.py
:language: python
:dedent: 4
:start-after: [START howto_operator_spark_jdbc]
@@ -65,7 +65,7 @@ For parameter definition take a look at
:class:`~airflow.providers.apache.spark.
Using the operator
""""""""""""""""""
-.. exampleinclude::
/../../airflow/providers/apache/spark/example_dags/example_spark_dag.py
+.. exampleinclude::
/../../tests/system/providers/apache/spark/example_spark_dag.py
:language: python
:dedent: 4
:start-after: [START howto_operator_spark_sql]
@@ -88,7 +88,7 @@ For parameter definition take a look at
:class:`~airflow.providers.apache.spark.
Using the operator
""""""""""""""""""
-.. exampleinclude::
/../../airflow/providers/apache/spark/example_dags/example_spark_dag.py
+.. exampleinclude::
/../../tests/system/providers/apache/spark/example_spark_dag.py
:language: python
:dedent: 4
:start-after: [START howto_operator_spark_submit]
diff --git a/airflow/providers/apache/spark/example_dags/example_spark_dag.py
b/tests/system/providers/apache/spark/example_spark_dag.py
similarity index 84%
rename from airflow/providers/apache/spark/example_dags/example_spark_dag.py
rename to tests/system/providers/apache/spark/example_spark_dag.py
index a280d4f3d2..b747e9ae4d 100644
--- a/airflow/providers/apache/spark/example_dags/example_spark_dag.py
+++ b/tests/system/providers/apache/spark/example_spark_dag.py
@@ -20,6 +20,8 @@
Example Airflow DAG to submit Apache Spark applications using
`SparkSubmitOperator`, `SparkJDBCOperator` and `SparkSqlOperator`.
"""
+
+import os
from datetime import datetime
from airflow.models import DAG
@@ -27,8 +29,11 @@ from airflow.providers.apache.spark.operators.spark_jdbc
import SparkJDBCOperato
from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.operators.spark_submit import
SparkSubmitOperator
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_spark_operator"
+
with DAG(
- dag_id='example_spark_operator',
+ dag_id=DAG_ID,
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
@@ -64,5 +69,12 @@ with DAG(
# [END howto_operator_spark_jdbc]
# [START howto_operator_spark_sql]
- sql_job = SparkSqlOperator(sql="SELECT * FROM bar", master="local",
task_id="sql_job")
+ spark_sql_job = SparkSqlOperator(
+ sql="SELECT COUNT(1) as cnt FROM temp_table", master="local",
task_id="spark_sql_job"
+ )
# [END howto_operator_spark_sql]
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)