Thanks TP, I left a bunch of comments in the AIP doc. But overall, this looks really good
Vikram On Mon, Jul 29, 2024 at 2:23 PM Kaxil Naik <kaxiln...@gmail.com> wrote: > For example: Partitioning the Data: > > @asset(..., schedule="@daily", partition=PartitionByInternal("@daily")) > def daily_sales_data(): > ... > > > Incremental Processing within these Partitions might look something like > where we store the watermark in a Variable: > > def fetch_data_from_gcs(last_processed_timestamp): > ... > blobs = bucket.list_blobs() > > > # Fetch files updated after last_processed_timestamp > new_data = [] > for blob in blobs: > if blob.updated > last_processed_timestamp: > new_data.append(blob) > > return new_data > > > @asset(schedule="@daily", partition=PartitionByInternal("@daily")) > def daily_sales_data(): > # Get the last processed timestamp > # Fetch the last processed timestamp from Airflow Variable > last_processed_timestamp_dt = > Variable.get("last_processed_timestamp", default_var=str(datetime(2024, 7, > 29))) > last_processed_timestamp = > datetime.fromisoformat(last_processed_timestamp_dt) > > # Fetch new or updated data from GCS > new_data = fetch_data_from_gcs(last_processed_timestamp) > > > if new_data: > # Process the new data > process_partition(new_data) > > > # Update the watermark to the latest timestamp from the new data > latest_timestamp = max(blob.updated for blob in new_data) > Variable.set("last_processed_timestamp", > latest_timestamp.isoformat()) > > > > > On Mon, 29 Jul 2024 at 19:32, Daniel Standish > <daniel.stand...@astronomer.io.invalid> wrote: > > > > > > > We should clarify in the AIP doc that the proposed partitioning feature > > is > > > not designed specifically to handle incremental loads in the > traditional > > > sense. Instead, it is intended to manage and process data in defined > > > segments or partitions. > > > > > > Agree. > > > > > > > However, partitions can be used in conjunction with incremental loading > > > strategies. For example, a time-based partitioning scheme can ensure > that > > > only data from relevant time periods is processed, > > > *and within thosepartitions, incremental updates can be tracked and > > > processed.* > > > > > > I'm not sure what you mean by this, particularly the bit I emphasized. > Can > > you try to clarify? > > > > > > On Mon, Jul 29, 2024 at 11:01 AM Kaxil Naik <kaxiln...@gmail.com> wrote: > > > > > Yeah, TP and I discussed that we aren't solving the incremental load > > > problem; folks can use it to achieve it similar to how you achieved it > by > > > storing the Watermark in Variables and we can natively support it with > a > > > revised AIP-30 in one of the minor releases for Airflow 3. > > > > > > We should clarify in the AIP doc that the proposed partitioning feature > > is > > > not designed specifically to handle incremental loads in the > traditional > > > sense. Instead, it is intended to manage and process data in defined > > > segments or partitions. > > > > > > However, partitions can be used in conjunction with incremental loading > > > strategies. For example, a time-based partitioning scheme can ensure > that > > > only data from relevant time periods is processed, and within those > > > partitions, incremental updates can be tracked and processed. > > > > > > Regards, > > > Kaxil > > > > > > > > > > > > On Mon, 29 Jul 2024 at 18:00, Daniel Standish > > > <daniel.stand...@astronomer.io.invalid> wrote: > > > > > > > Hi, > > > > > > > > *1. incremental loads* > > > > > > > > There is mention of incremental processing / incremental loads in the > > > doc. > > > > > > > > E.g. > > > > > > > > This is particularly useful for large datasets that need to be > > processed > > > > > incrementally or updated periodically. > > > > > > > > > > > > And > > > > > > > > > Facilitating Incremental Processing: Many modern data processing > > > > > strategies rely on incremental updates > > > > > > > > > > > > But there are no examples re how this solves for that use case. > > > > > > > > I think it's actually not good to think of or talk about incremental > > > loads > > > > as "partitioned". > > > > > > > > Let me explain. > > > > > > > > An incremental load might track an `updated_at` column. The data it > > > > processes is the data with an updated `updated_at` column. But you > > would > > > > not be correct in calling this a partition of data. Because when the > > > data > > > > is updated again, it would now be in another partition. That's not > > what > > > > partitioning is. > > > > > > > > If this is supposed to solve for incremental loads, I think an > example > > is > > > > needed. If it's not, let's call it out explicitly and say this is > not > > > > solving for incremental loads. > > > > > > > > *2. support for tasks* > > > > > > > > I see this is specific to tasks defined with the asset syntax. > What's > > > the > > > > story with "normal" dags and tasks e.g. with task flow or classic > > > > operators. Is this AIP adding support only for assets? Is there > some > > > plan > > > > for that? > > > > > > > > > >