You could write a versioned bundle that did not use versioned buckets.

On Sat, Sep 20, 2025 at 1:22 AM Shahar Epstein <[email protected]> wrote:

> To have something to start with, I created a PR for a non-versioned GCS Dag
> Bundle based on the S3 implementation
> https://github.com/apache/airflow/pull/55919
>
> Regarding versioning -
> I'm not sure how it works in S3, but at least in GCS when you "pull down"
> (restore) a specific object's version, you have to overwrite the existing
> one - so we need to think of a workaround here:
> https://cloud.google.com/storage/docs/using-versioned-objects#restore
>
>
> On Fri, Sep 19, 2025 at 3:36 PM Eugen Kosteev <[email protected]> wrote:
>
> > Ha, good point!
> > That is actually the way to go.
> >
> > Thanks Ash.
> >
> > - Eugene
> >
> > On Fri, Sep 19, 2025 at 10:52 AM Ash Berlin-Taylor <[email protected]>
> wrote:
> >
> > > The correct fix for this on Airflow 3 is to write a GCS Dag bundle
> > > backend, to use versioned buckets, so that when a worker requests a
> > version
> > > to run it the Bundle manager can pull down the specific object version
> > out
> > > of the bucket — i.e. don’t rely on a separate gustil sync process.
> > >
> > > > On 17 Sep 2025, at 12:29, Eugen Kosteev <[email protected]> wrote:
> > > >
> > > > Hello.
> > > >
> > > > I would like to discuss the following issue that we face in Cloud
> > > Composer
> > > > (and probably others face too).
> > > > We deploy Airflow components running in separate GKE pods, and DAG
> > files
> > > > are synced from GCS (Google Cloud Storage) to each component
> > separately -
> > > > we do not use any NFS-type disks mounted to each component,
> > > > the DAG files are continuously synced to each pod (i.e. something
> like
> > > > ~"gsutil rsync ..." in a loop).
> > > >
> > > > Since all components are in such a distributed environment, DAG files
> > can
> > > > be out of sync between components, and this results in the following
> > > issue:
> > > > 1. new DAG file is synced to DAG processor
> > > > 2. new DAG is scheduled by scheduler
> > > > 3. Celery worker starts execution of the task (scheduled DAG) and
> fails
> > > > (can't parse file) because DAG file is not yet synced to worker
> > > > 4. new DAG file is synced to Celery worker
> > > >
> > > > The parsing of the DAG file in task runner happens here:
> > > >
> > >
> >
> https://github.com/apache/airflow/blob/eabe6b8dd77204f7c0d117c9d9ad1f4166869671/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L634
> > > >
> > > > So far, we were trying different hacks to address this issue in Cloud
> > > > Composer.
> > > >
> > > > *Question:*
> > > > Would it make sense/is it possible to have some retry logic in the
> > > "parse"
> > > > method of task runner? For example, ~implementation:
> > > > - DAG is parsed
> > > > *- if DAG is not found -> sleep + retry parsing (loop)*
> > > > *- if timeout reached, exit with message "Dag not found ..."*
> > > > - if DAG is found, continue
> > > >
> > > > Having any value >0 for timeout has its own downside, that failure of
> > the
> > > > tasks which DAG files really disappear will now take more time.
> > > >
> > > > The timeout can be configurable, and we can have "0" as default
> value,
> > > > which means that the implementation will be completely backward
> > > compatible.
> > > > And Airflow administrators can override this value, knowing that they
> > > have
> > > > the issue described above, and downsides of having this timeout
> > > increased.
> > > >
> > > > Any thoughts?
> > > >
> > > > --
> > > > Eugene
> > >
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe, e-mail: [email protected]
> > > For additional commands, e-mail: [email protected]
> > >
> > >
> >
> > --
> > Eugene
> >
>

Reply via email to