cshuo opened a new issue, #14252:
URL: https://github.com/apache/hudi/issues/14252

   ### Feature Description
   
   **What the feature achieves:**
   
   Support out-of-box schema on write for flink.
   
   **Why this feature is needed:**
   
   Currently, spark writers support oob schema on write: 
https://hudi.apache.org/docs/schema_evolution/#schema-evolution-in-action, 
which allows user writing records to hudi table with backwards-compatible 
schema evolution scenarios, such as adding a null field or promoting field 
type. But flink writer don't have full support for schema on write, we should 
supplement this feature for flink writer.
   
   E.g., the following writing to COW table with schema promoting is not 
supported.
   ```java
      // writer with schema1
       DataStream<RowData> dataStream = execEnv.fromData(
           createRowData("id1", "Alice", 25, Timestamp.from(Instant.now()), 
"par1"),
           createRowData("id2", "Lily", 21, Timestamp.from(Instant.now()), 
"par1"),
           createRowData("id3", "Julia", 15, Timestamp.from(Instant.now()), 
"par1")
       );
       HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink")
           .column("uuid string not null")
           .column("name string")
           .column("age int")
           .column("`ts` timestamp(3)")
           .column("`partition` string")
           .pk("uuid")
           .partition("partition")
           .options(options);
       builder.sink(dataStream, false);
       execute(execEnv, false, "Api_Sink_Test");
      
      // writer with schema2, promoting `age` from INT to DOUBLE
       dataStream = execEnv.fromData(
           createRowData("id1", "Alice", 25.1, 
Timestamp.from(Instant.now().plusMillis(1000)), "par1"),
           createRowData("id2", "Lily", 21.2, 
Timestamp.from(Instant.now().plusMillis(1000)), "par1"),
           createRowData("id3", "Julia", 15.3, 
Timestamp.from(Instant.now().plusMillis(100)), "par1")
       );
       builder = HoodiePipeline.builder("test_sink")
           .column("uuid string not null")
           .column("name string")
           .column("age double")
           .column("`ts` timestamp(3)")
           .column("`partition` string")
           .pk("uuid")
           .partition("partition")
           .options(options);
       builder.sink(dataStream, false);
       execute(execEnv, false, "Api_Sink_Test_1");
   
   ```
   
   ### User Experience
   
   **How users will use this feature:**
   - Configuration changes needed
   - API changes
   - Usage examples
   
   
   ### Hudi RFC Requirements
   
   **RFC PR link:** (if applicable)
   
   **Why RFC is/isn't needed:**
   - Does this change public interfaces/APIs? (Yes/No)
   - Does this change storage format? (Yes/No)
   - Justification:
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to