For example: Partitioning the Data: @asset(..., schedule="@daily", partition=PartitionByInternal("@daily")) def daily_sales_data(): ...
Incremental Processing within these Partitions might look something like where we store the watermark in a Variable: def fetch_data_from_gcs(last_processed_timestamp): ... blobs = bucket.list_blobs() # Fetch files updated after last_processed_timestamp new_data = [] for blob in blobs: if blob.updated > last_processed_timestamp: new_data.append(blob) return new_data @asset(schedule="@daily", partition=PartitionByInternal("@daily")) def daily_sales_data(): # Get the last processed timestamp # Fetch the last processed timestamp from Airflow Variable last_processed_timestamp_dt = Variable.get("last_processed_timestamp", default_var=str(datetime(2024, 7, 29))) last_processed_timestamp = datetime.fromisoformat(last_processed_timestamp_dt) # Fetch new or updated data from GCS new_data = fetch_data_from_gcs(last_processed_timestamp) if new_data: # Process the new data process_partition(new_data) # Update the watermark to the latest timestamp from the new data latest_timestamp = max(blob.updated for blob in new_data) Variable.set("last_processed_timestamp", latest_timestamp.isoformat()) On Mon, 29 Jul 2024 at 19:32, Daniel Standish <daniel.stand...@astronomer.io.invalid> wrote: > > > > We should clarify in the AIP doc that the proposed partitioning feature > is > > not designed specifically to handle incremental loads in the traditional > > sense. Instead, it is intended to manage and process data in defined > > segments or partitions. > > > Agree. > > > > However, partitions can be used in conjunction with incremental loading > > strategies. For example, a time-based partitioning scheme can ensure that > > only data from relevant time periods is processed, > > *and within thosepartitions, incremental updates can be tracked and > > processed.* > > > I'm not sure what you mean by this, particularly the bit I emphasized. Can > you try to clarify? > > > On Mon, Jul 29, 2024 at 11:01 AM Kaxil Naik <kaxiln...@gmail.com> wrote: > > > Yeah, TP and I discussed that we aren't solving the incremental load > > problem; folks can use it to achieve it similar to how you achieved it by > > storing the Watermark in Variables and we can natively support it with a > > revised AIP-30 in one of the minor releases for Airflow 3. > > > > We should clarify in the AIP doc that the proposed partitioning feature > is > > not designed specifically to handle incremental loads in the traditional > > sense. Instead, it is intended to manage and process data in defined > > segments or partitions. > > > > However, partitions can be used in conjunction with incremental loading > > strategies. For example, a time-based partitioning scheme can ensure that > > only data from relevant time periods is processed, and within those > > partitions, incremental updates can be tracked and processed. > > > > Regards, > > Kaxil > > > > > > > > On Mon, 29 Jul 2024 at 18:00, Daniel Standish > > <daniel.stand...@astronomer.io.invalid> wrote: > > > > > Hi, > > > > > > *1. incremental loads* > > > > > > There is mention of incremental processing / incremental loads in the > > doc. > > > > > > E.g. > > > > > > This is particularly useful for large datasets that need to be > processed > > > > incrementally or updated periodically. > > > > > > > > > And > > > > > > > Facilitating Incremental Processing: Many modern data processing > > > > strategies rely on incremental updates > > > > > > > > > But there are no examples re how this solves for that use case. > > > > > > I think it's actually not good to think of or talk about incremental > > loads > > > as "partitioned". > > > > > > Let me explain. > > > > > > An incremental load might track an `updated_at` column. The data it > > > processes is the data with an updated `updated_at` column. But you > would > > > not be correct in calling this a partition of data. Because when the > > data > > > is updated again, it would now be in another partition. That's not > what > > > partitioning is. > > > > > > If this is supposed to solve for incremental loads, I think an example > is > > > needed. If it's not, let's call it out explicitly and say this is not > > > solving for incremental loads. > > > > > > *2. support for tasks* > > > > > > I see this is specific to tasks defined with the asset syntax. What's > > the > > > story with "normal" dags and tasks e.g. with task flow or classic > > > operators. Is this AIP adding support only for assets? Is there some > > plan > > > for that? > > > > > >