This is an automated email from the ASF dual-hosted git repository.
basph pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new de99cd97fe Fix formatting leftovers (#27750)
de99cd97fe is described below
commit de99cd97fe0a83dc0479f78db0049cf7a8d43943
Author: Bolke de Bruin <[email protected]>
AuthorDate: Thu Nov 17 18:22:22 2022 +0100
Fix formatting leftovers (#27750)
PR 27540 left some formattng issues which weren't caught
---
airflow/models/xcom.py | 2 +-
docs/apache-airflow/concepts/taskflow.rst | 59 ++++++++++++++++---------------
2 files changed, 32 insertions(+), 29 deletions(-)
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 83fb575cf4..9aea200a2c 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -629,7 +629,7 @@ class BaseXCom(Base, LoggingMixin):
" then you need to enable pickle support for XCom"
" in your airflow config or make sure to decorate your"
" object with attr.",
- ex
+ ex,
)
raise
diff --git a/docs/apache-airflow/concepts/taskflow.rst
b/docs/apache-airflow/concepts/taskflow.rst
index cc0030c9ed..7efa38b843 100644
--- a/docs/apache-airflow/concepts/taskflow.rst
+++ b/docs/apache-airflow/concepts/taskflow.rst
@@ -104,39 +104,42 @@ a ``Dataset``, which is ``@attr.define`` decorated,
together with TaskFlow.
from airflow import Dataset
from airflow.decorators import dag, task
- SRC =
Dataset("https://www.ncei.noaa.gov/access/monitoring/climate-at-a-glance/global/time-series/globe/land_ocean/ytd/12/1880-2022.json")
+ SRC = Dataset(
+
"https://www.ncei.noaa.gov/access/monitoring/climate-at-a-glance/global/time-series/globe/land_ocean/ytd/12/1880-2022.json"
+ )
now = pendulum.now()
@dag(start_date=now, schedule="@daily", catchup=False)
def etl():
- @task()
- def retrieve(src: Dataset) -> dict:
- resp = requests.get(url=src.uri)
- data = resp.json()
- return data["data"]
-
- @task()
- def to_fahrenheit(temps: dict[int, float]) -> dict[int, float]:
- ret: dict[int, float] = {}
- for year, celsius in temps.items():
- ret[year] = float(celsius)*1.8+32
-
- return ret
-
- @task()
- def load(fahrenheit: dict[int, float]) -> Dataset:
- filename = "/tmp/fahrenheit.json"
- s = json.dumps(fahrenheit)
- f = open(filename, "w")
- f.write(s)
- f.close()
-
- return Dataset(f"file:///{filename}")
-
- data = retrieve(SRC)
- fahrenheit = to_fahrenheit(data)
- load(fahrenheit)
+ @task()
+ def retrieve(src: Dataset) -> dict:
+ resp = requests.get(url=src.uri)
+ data = resp.json()
+ return data["data"]
+
+ @task()
+ def to_fahrenheit(temps: dict[int, float]) -> dict[int, float]:
+ ret: dict[int, float] = {}
+ for year, celsius in temps.items():
+ ret[year] = float(celsius) * 1.8 + 32
+
+ return ret
+
+ @task()
+ def load(fahrenheit: dict[int, float]) -> Dataset:
+ filename = "/tmp/fahrenheit.json"
+ s = json.dumps(fahrenheit)
+ f = open(filename, "w")
+ f.write(s)
+ f.close()
+
+ return Dataset(f"file:///{filename}")
+
+ data = retrieve(SRC)
+ fahrenheit = to_fahrenheit(data)
+ load(fahrenheit)
+
etl()