Thanks TP, I left a bunch of comments in the AIP doc.
But overall, this looks really good

Vikram

On Mon, Jul 29, 2024 at 2:23 PM Kaxil Naik <kaxiln...@gmail.com> wrote:

> For example: Partitioning the Data:
>
> @asset(..., schedule="@daily", partition=PartitionByInternal("@daily"))
> def daily_sales_data():
>     ...
>
>
> Incremental Processing within these Partitions might look something like
> where we store the watermark in a Variable:
>
> def fetch_data_from_gcs(last_processed_timestamp):
>       ...
>       blobs = bucket.list_blobs()
>
>
>       # Fetch files updated after last_processed_timestamp
>       new_data = []
>       for blob in blobs:
>         if blob.updated > last_processed_timestamp:
>             new_data.append(blob)
>
>       return new_data
>
>
> @asset(schedule="@daily", partition=PartitionByInternal("@daily"))
> def daily_sales_data():
>       # Get the last processed timestamp
>       # Fetch the last processed timestamp from Airflow Variable
>       last_processed_timestamp_dt =
> Variable.get("last_processed_timestamp", default_var=str(datetime(2024, 7,
> 29)))
>       last_processed_timestamp =
> datetime.fromisoformat(last_processed_timestamp_dt)
>
>       # Fetch new or updated data from GCS
>       new_data = fetch_data_from_gcs(last_processed_timestamp)
>
>
>       if new_data:
>           # Process the new data
>           process_partition(new_data)
>
>
>           # Update the watermark to the latest timestamp from the new data
>           latest_timestamp = max(blob.updated for blob in new_data)
>           Variable.set("last_processed_timestamp",
> latest_timestamp.isoformat())
>
>
>
>
> On Mon, 29 Jul 2024 at 19:32, Daniel Standish
> <daniel.stand...@astronomer.io.invalid> wrote:
>
> > >
> > > We should clarify in the AIP doc that the proposed partitioning feature
> > is
> > > not designed specifically to handle incremental loads in the
> traditional
> > > sense. Instead, it is intended to manage and process data in defined
> > > segments or partitions.
> >
> >
> > Agree.
> >
> >
> > > However, partitions can be used in conjunction with incremental loading
> > > strategies. For example, a time-based partitioning scheme can ensure
> that
> > > only data from relevant time periods is processed,
> > > *and within thosepartitions, incremental updates can be tracked and
> > > processed.*
> >
> >
> > I'm not sure what you mean by this, particularly the bit I emphasized.
> Can
> > you try to clarify?
> >
> >
> > On Mon, Jul 29, 2024 at 11:01 AM Kaxil Naik <kaxiln...@gmail.com> wrote:
> >
> > > Yeah, TP and I discussed that we aren't solving the incremental load
> > > problem; folks can use it to achieve it similar to how you achieved it
> by
> > > storing the Watermark in Variables and we can natively support it with
> a
> > > revised AIP-30 in one of the minor releases for Airflow 3.
> > >
> > > We should clarify in the AIP doc that the proposed partitioning feature
> > is
> > > not designed specifically to handle incremental loads in the
> traditional
> > > sense. Instead, it is intended to manage and process data in defined
> > > segments or partitions.
> > >
> > > However, partitions can be used in conjunction with incremental loading
> > > strategies. For example, a time-based partitioning scheme can ensure
> that
> > > only data from relevant time periods is processed, and within those
> > > partitions, incremental updates can be tracked and processed.
> > >
> > > Regards,
> > > Kaxil
> > >
> > >
> > >
> > > On Mon, 29 Jul 2024 at 18:00, Daniel Standish
> > > <daniel.stand...@astronomer.io.invalid> wrote:
> > >
> > > > Hi,
> > > >
> > > > *1. incremental loads*
> > > >
> > > > There is mention of incremental processing / incremental loads in the
> > > doc.
> > > >
> > > > E.g.
> > > >
> > > > This is particularly useful for large datasets that need to be
> > processed
> > > > > incrementally or updated periodically.
> > > >
> > > >
> > > > And
> > > >
> > > > > Facilitating Incremental Processing: Many modern data processing
> > > > > strategies rely on incremental updates
> > > >
> > > >
> > > > But there are no examples re how this solves for that use case.
> > > >
> > > > I think it's actually not good to think of or talk about incremental
> > > loads
> > > > as "partitioned".
> > > >
> > > > Let me explain.
> > > >
> > > > An incremental load might track an `updated_at` column.  The data it
> > > > processes is the data with an updated `updated_at` column.  But you
> > would
> > > > not be correct in calling this a partition of data.  Because when the
> > > data
> > > > is updated again, it would now be in another partition.  That's not
> > what
> > > > partitioning is.
> > > >
> > > > If this is supposed to solve for incremental loads, I think an
> example
> > is
> > > > needed.  If it's not, let's call it out explicitly and say this is
> not
> > > > solving for incremental loads.
> > > >
> > > > *2. support for tasks*
> > > >
> > > > I see this is specific to tasks defined with the asset syntax.
> What's
> > > the
> > > > story with "normal" dags and tasks e.g. with task flow or classic
> > > > operators.  Is this AIP adding support only for assets?  Is there
> some
> > > plan
> > > > for that?
> > > >
> > >
> >
>

Reply via email to