bolkedebruin commented on PR #34729:
URL: https://github.com/apache/airflow/pull/34729#issuecomment-1749091768
@uranusjr
> I see what you mean. The two do have some similarities though, say I want
to trigger a(nother) DAG when a file on S3 is modified, I would write something
like this:
>
> ```python
> inp = fs.mount("file://my-input.csv")
> out = fs.mount("s3://my-warehouse/file.csv")
>
> out_ds = Dataset("s3://my-warehouse/file.csv")
>
> @task(outlets=[out_ds])
> def upload(source, target):
> with fs.open(target "w"): as f:
> f.write(source.read())
>
> upload(inp, out)
> ```
>
> but the fact I need to manually tell Airflow explicitly what is done in
the task seems a bit awkward, and it seems like Airflow should be able to just
do the right thing. But admittedly I haven’t figured out how exactly Airflow
should infer `out` should be triggered (but not `inp`!) so maybe that’s
premature; we can always figure that out later.
There seems to be a misconception here. Typically, you would not mount a
file, but a filesystem. So not `file:///my-input.csv` but more like
`file:///where/my-input/lives`. In that case you would end up with:
```python
local_fs = fs.mount("file:///data")
remote_fs = fs.mount("s3://my-warehouse")
out_ds = Dataset(remote_fs / "file.csv")
@task(outlets=[out_ds])
def upload(source, target):
with fs.open(target, "w"): as f:
f.write(source.read())
# the more optimized version of this looks like this btw
@task(outlets=[out_ds])
def upload(source target):
# fs.copy figures out the most efficient way of copying
fs.copy(source, target)
```
Being able to deal with Dataset parameters, should maybe be on the list.
Although I am not entirely happy with the nature of Datasets being defined at
parse time instead of runtime.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]