suryaprasanna opened a new issue, #18298:
URL: https://github.com/apache/hudi/issues/18298

   ### Feature Description
   
   **What the feature achieves:**
   This feature would enable automatic collection and tracking of data 
freshness metrics for Hudi tables by exposing lineage information from write 
operations.
   Key capabilities:
   - Automatic freshness metrics collection based on upstream sources (Kafka 
topics, other datasets, etc.)
   - Date-wise freshness metrics based on partitions read or written. This is 
only applicable for date partitions (initial phase)
   - Commit-level freshness metrics. Using this true freshness of the dataset 
can be calculated (future phase)
   - Lineage information returned from write operations that can be consumed by 
external freshness tracking services
   - Unified freshness tracking across both Hudi and traditional Hive datasets 
in the datalake.
   
   **Why this feature is needed:**
   In general, Datalake contains multiple data formats like Hudi, Iceberg and 
traditional Hive datasets, and we need a way to track data freshness across all 
of them. Currently:
   - No built-in mechanism: There is no way to collect freshness values for 
Hudi datasets in the datalake.
   - Missing lineage information: Write operations (df.write API, INSERT 
OVERWRITE, INSERT INTO, MERGE INTO) return Seq.empty[Row] without providing any 
lineage or freshness metadata to engines like Spark.
   - Manual tracking is not scalable: Without automatic collection, tracking 
freshness across multiple datasets is error-prone and doesn't scale
   - No integration points: External freshness services cannot automatically 
receive updates from Hudi write operations
   
   ### User Experience
   
   **How users will use this feature:**
   - Configuration changes needed
   - API changes
     - Current behavior:
   ```// InsertIntoHoodieTableCommand and similar operations return empty 
results
   val result: Seq[Row] = Seq.empty[Row]
   ```
     - Proposed behavior:
   ```
     // Return lineage and freshness information
     val result: Seq[Row] = Seq(
       Row(
         "table_name" -> "my_hudi_table",
         "commit_time" -> "20260309120000",
         "upstream_sources" -> Array("kafka_topic_xyz", "upstream_table_abc"),
         "freshness_timestamp" -> "2026-03-09T12:00:00Z",
         "record_count" -> 1000000
       )
     )
   ```
   - Usage examples
   
     Example 1: Spark DataFrame Write API
     val df = spark.read.format("kafka")...
   
     // Write to Hudi table
     val result = df.write
       .format("hudi")
       .option("hoodie.freshness.tracking.enabled", "true")
       .save("path/to/hudi/table")
   
     // Consume freshness information
     result.foreach { row =>
       val freshnessInfo = row.getAs[Map[String, Any]]("lineage_info")
       freshnessService.update(freshnessInfo)
     }
   
     Example 2: Spark SQL
     INSERT INTO hudi_table
     SELECT * FROM kafka_source;
   
     -- Freshness info automatically sent to configured service
     -- or available via Spark's QueryExecution
   
   
   ### Hudi RFC Requirements
   
   **RFC PR link:** (if applicable)
   
   **Why RFC is/isn't needed:**
   - Does this change public interfaces/APIs? (Yes/No)
   - Does this change storage format? (Yes/No)
   - Justification:
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to