This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch aip-62/s3 in repository https://gitbox.apache.org/repos/asf/airflow.git
commit af947f3a7b5ec5f032f45490ec91926bedbc0ad8 Author: Maciej Obuchowski <[email protected]> AuthorDate: Tue Jul 9 14:39:44 2024 +0200 AIP-62 support for S3Hook and S3 usage of ObjectStore Signed-off-by: Maciej Obuchowski <[email protected]> --- airflow/providers/amazon/aws/hooks/s3.py | 19 ++++++++++++++++++- airflow/providers/common/io/datasets/__init__.py | 16 ++++++++++++++++ airflow/providers/common/io/datasets/file.py | 24 ++++++++++++++++++++++++ airflow/providers/common/io/provider.yaml | 4 ++++ 4 files changed, 62 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/hooks/s3.py b/airflow/providers/amazon/aws/hooks/s3.py index 8ca93766e2..a30959247e 100644 --- a/airflow/providers/amazon/aws/hooks/s3.py +++ b/airflow/providers/amazon/aws/hooks/s3.py @@ -41,6 +41,8 @@ from typing import TYPE_CHECKING, Any, AsyncIterator, Callable from urllib.parse import urlsplit from uuid import uuid4 +from airflow.lineage.hook import get_hook_lineage_collector + if TYPE_CHECKING: from mypy_boto3_s3.service_resource import Bucket as S3Bucket, Object as S3ResourceObject @@ -1111,6 +1113,9 @@ class S3Hook(AwsBaseHook): client = self.get_conn() client.upload_file(filename, bucket_name, key, ExtraArgs=extra_args, Config=self.transfer_config) + get_hook_lineage_collector().add_output_dataset( + {"scheme": "s3", "bucket_name": bucket_name, "key": key}, self + ) @unify_bucket_name_and_key @provide_bucket_name @@ -1251,6 +1256,9 @@ class S3Hook(AwsBaseHook): ExtraArgs=extra_args, Config=self.transfer_config, ) + get_hook_lineage_collector().add_output_dataset( + {"scheme": "s3", "bucket_name": bucket_name, "key": key}, self + ) def copy_object( self, @@ -1306,6 +1314,12 @@ class S3Hook(AwsBaseHook): response = self.get_conn().copy_object( Bucket=dest_bucket_name, Key=dest_bucket_key, CopySource=copy_source, **kwargs ) + get_hook_lineage_collector().add_input_dataset( + {"scheme": "s3", "bucket_name": source_bucket_name, "key": source_bucket_key}, self + ) + get_hook_lineage_collector().add_output_dataset( + {"scheme": "s3", "bucket_name": dest_bucket_name, "key": dest_bucket_key}, self + ) return response @provide_bucket_name @@ -1425,6 +1439,7 @@ class S3Hook(AwsBaseHook): file_path.parent.mkdir(exist_ok=True, parents=True) + get_hook_lineage_collector().add_output_dataset({"scheme": "file", "path": ""}, self) file = open(file_path, "wb") else: file = NamedTemporaryFile(dir=local_path, prefix="airflow_tmp_", delete=False) # type: ignore @@ -1435,7 +1450,9 @@ class S3Hook(AwsBaseHook): ExtraArgs=self.extra_args, Config=self.transfer_config, ) - + get_hook_lineage_collector().add_input_dataset( + {"scheme": "s3", "bucket_name": bucket_name, "key": key}, self + ) return file.name def generate_presigned_url( diff --git a/airflow/providers/common/io/datasets/__init__.py b/airflow/providers/common/io/datasets/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/airflow/providers/common/io/datasets/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/common/io/datasets/file.py b/airflow/providers/common/io/datasets/file.py new file mode 100644 index 0000000000..46c7499037 --- /dev/null +++ b/airflow/providers/common/io/datasets/file.py @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow.datasets import Dataset + + +def create_dataset(*, path: str) -> Dataset: + # We assume that we get absolute path starting with / + return Dataset(uri=f"file://{path}") diff --git a/airflow/providers/common/io/provider.yaml b/airflow/providers/common/io/provider.yaml index aa3c2ac901..0270bfaa53 100644 --- a/airflow/providers/common/io/provider.yaml +++ b/airflow/providers/common/io/provider.yaml @@ -51,6 +51,10 @@ operators: xcom: - airflow.providers.common.io.xcom.backend +dataset-uris: + - schemes: [file] + factory: airflow.providers.common.io.datasets.file.create_dataset + config: common.io: description: Common IO configuration section
