For example: Partitioning the Data:

@asset(..., schedule="@daily", partition=PartitionByInternal("@daily"))
def daily_sales_data():
    ...


Incremental Processing within these Partitions might look something like
where we store the watermark in a Variable:

def fetch_data_from_gcs(last_processed_timestamp):
      ...
      blobs = bucket.list_blobs()


      # Fetch files updated after last_processed_timestamp
      new_data = []
      for blob in blobs:
        if blob.updated > last_processed_timestamp:
            new_data.append(blob)

      return new_data


@asset(schedule="@daily", partition=PartitionByInternal("@daily"))
def daily_sales_data():
      # Get the last processed timestamp
      # Fetch the last processed timestamp from Airflow Variable
      last_processed_timestamp_dt =
Variable.get("last_processed_timestamp", default_var=str(datetime(2024, 7,
29)))
      last_processed_timestamp =
datetime.fromisoformat(last_processed_timestamp_dt)

      # Fetch new or updated data from GCS
      new_data = fetch_data_from_gcs(last_processed_timestamp)


      if new_data:
          # Process the new data
          process_partition(new_data)


          # Update the watermark to the latest timestamp from the new data
          latest_timestamp = max(blob.updated for blob in new_data)
          Variable.set("last_processed_timestamp",
latest_timestamp.isoformat())




On Mon, 29 Jul 2024 at 19:32, Daniel Standish
<daniel.stand...@astronomer.io.invalid> wrote:

> >
> > We should clarify in the AIP doc that the proposed partitioning feature
> is
> > not designed specifically to handle incremental loads in the traditional
> > sense. Instead, it is intended to manage and process data in defined
> > segments or partitions.
>
>
> Agree.
>
>
> > However, partitions can be used in conjunction with incremental loading
> > strategies. For example, a time-based partitioning scheme can ensure that
> > only data from relevant time periods is processed,
> > *and within thosepartitions, incremental updates can be tracked and
> > processed.*
>
>
> I'm not sure what you mean by this, particularly the bit I emphasized.  Can
> you try to clarify?
>
>
> On Mon, Jul 29, 2024 at 11:01 AM Kaxil Naik <kaxiln...@gmail.com> wrote:
>
> > Yeah, TP and I discussed that we aren't solving the incremental load
> > problem; folks can use it to achieve it similar to how you achieved it by
> > storing the Watermark in Variables and we can natively support it with a
> > revised AIP-30 in one of the minor releases for Airflow 3.
> >
> > We should clarify in the AIP doc that the proposed partitioning feature
> is
> > not designed specifically to handle incremental loads in the traditional
> > sense. Instead, it is intended to manage and process data in defined
> > segments or partitions.
> >
> > However, partitions can be used in conjunction with incremental loading
> > strategies. For example, a time-based partitioning scheme can ensure that
> > only data from relevant time periods is processed, and within those
> > partitions, incremental updates can be tracked and processed.
> >
> > Regards,
> > Kaxil
> >
> >
> >
> > On Mon, 29 Jul 2024 at 18:00, Daniel Standish
> > <daniel.stand...@astronomer.io.invalid> wrote:
> >
> > > Hi,
> > >
> > > *1. incremental loads*
> > >
> > > There is mention of incremental processing / incremental loads in the
> > doc.
> > >
> > > E.g.
> > >
> > > This is particularly useful for large datasets that need to be
> processed
> > > > incrementally or updated periodically.
> > >
> > >
> > > And
> > >
> > > > Facilitating Incremental Processing: Many modern data processing
> > > > strategies rely on incremental updates
> > >
> > >
> > > But there are no examples re how this solves for that use case.
> > >
> > > I think it's actually not good to think of or talk about incremental
> > loads
> > > as "partitioned".
> > >
> > > Let me explain.
> > >
> > > An incremental load might track an `updated_at` column.  The data it
> > > processes is the data with an updated `updated_at` column.  But you
> would
> > > not be correct in calling this a partition of data.  Because when the
> > data
> > > is updated again, it would now be in another partition.  That's not
> what
> > > partitioning is.
> > >
> > > If this is supposed to solve for incremental loads, I think an example
> is
> > > needed.  If it's not, let's call it out explicitly and say this is not
> > > solving for incremental loads.
> > >
> > > *2. support for tasks*
> > >
> > > I see this is specific to tasks defined with the asset syntax.  What's
> > the
> > > story with "normal" dags and tasks e.g. with task flow or classic
> > > operators.  Is this AIP adding support only for assets?  Is there some
> > plan
> > > for that?
> > >
> >
>

Reply via email to