[ 
https://issues.apache.org/jira/browse/HUDI-3953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated HUDI-3953:
--------------------------
    Description: 
Currently.  Flink Hudi Module only supports SQL APIs. People who want to use 
low-level APIs such used for operating Flink state or another purpose don't 
have a friendly way.

It can be provided a low-level APIs for users to write/read hoodie data

the API design and main change will be:
 # add sink and source API in Pipelines
 # getSinkRuntimeProvider in HoodieTableSink call Pipelines.sink(...) to return 
DataStreamSink
 # getScanRuntimeProvider in HoodieTableSource call Pipelines.source() to 
return DataStream
 # move some common methods such as getInputFormat in util class
 # low-level API such as read and write just call Pipelines.sink(...)  and 
Pipelines.source()

  was:
Currently.  Flink Hudi Module only supports SQL APIs. People who want to use 
low-level APIs such used for operating Flink state or another purpose don't 
have a friendly way.

It can be provided a low-level APIs for users to write/read hoodie data

Read code eg:

{code}
Map<String, String> confMap = new HashMap<>();
confMap.put("connector" , "hudi");
confMap.put("table.type", "MERGE_ON_READ");
confMap.put("path" , "hdfs://127.0.0.1:9000/hudi/hudi_db/mor_test9");
confMap.put("read.streaming.enabled" , "true");
confMap.put("read.streaming.check-interval" , "4");

DataStream<Row> rowDataStream = SourceBuilder
.builder()
.env(getEnv())
.schema(CREATE_TABLE_SCHEMA)
.options(confMap)
.partitions(Arrays.asList("dt", "hr"))
.build();
rowDataStream.print();
{code}

write  eg:
{code} 

DataStream<Row> input = dataStreamGen();
        Map<String, String> confMap = new HashMap<>();
        confMap.put("connector" , "hudi");
        confMap.put("table.type", "MERGE_ON_READ");
        confMap.put("path" , "hdfs://127.0.0.1:9000/hudi/hudi_db/mor_test9");
        confMap.put("hive_sync.enable" , "true");
        confMap.put("hive_sync.table" , "mor_test9");
        confMap.put("hive_sync.db" , "hudi_db");
        confMap.put("hive_sync.username" , "hoodie");
        confMap.put("hive_sync.password" , "hoodie");
        confMap.put("hive_sync.mode" , "hms");
        confMap.put("hive_sync.metastore.uris" , "thrift://localhost:9083");

        SinkBuilder.builder()
                         .input(input)
                         .options(confMap)
                         .partitions(Arrays.asList("dt", "hr"))
                         .schema(CREATE_TABLE_SCHEMA)
                         .build();
{code}
 


> Flink Hudi module should support  low-level read and write APIs
> ---------------------------------------------------------------
>
>                 Key: HUDI-3953
>                 URL: https://issues.apache.org/jira/browse/HUDI-3953
>             Project: Apache Hudi
>          Issue Type: Improvement
>            Reporter: yuemeng
>            Priority: Major
>              Labels: pull-request-available
>
> Currently.  Flink Hudi Module only supports SQL APIs. People who want to use 
> low-level APIs such used for operating Flink state or another purpose don't 
> have a friendly way.
> It can be provided a low-level APIs for users to write/read hoodie data
> the API design and main change will be:
>  # add sink and source API in Pipelines
>  # getSinkRuntimeProvider in HoodieTableSink call Pipelines.sink(...) to 
> return DataStreamSink
>  # getScanRuntimeProvider in HoodieTableSource call Pipelines.source() to 
> return DataStream
>  # move some common methods such as getInputFormat in util class
>  # low-level API such as read and write just call Pipelines.sink(...)  and 
> Pipelines.source()



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to