[
https://issues.apache.org/jira/browse/HUDI-3953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yuemeng updated HUDI-3953:
--------------------------
Description:
Currently. Flink Hudi Module only supports SQL APIs. People who want to use
low-level APIs such used for operating Flink state or another purpose don't
have a friendly way.
It can be provided a low-level APIs for users to write/read hoodie data
the API design and main change will be:
# add sink and source API in Pipelines
# getSinkRuntimeProvider in HoodieTableSink call Pipelines.sink(...) to return
DataStreamSink
# getScanRuntimeProvider in HoodieTableSource call Pipelines.source() to
return DataStream
# move some common methods such as getInputFormat in util class
# low-level API such as read and write just call Pipelines.sink(...) and
Pipelines.source()
was:
Currently. Flink Hudi Module only supports SQL APIs. People who want to use
low-level APIs such used for operating Flink state or another purpose don't
have a friendly way.
It can be provided a low-level APIs for users to write/read hoodie data
Read code eg:
{code}
Map<String, String> confMap = new HashMap<>();
confMap.put("connector" , "hudi");
confMap.put("table.type", "MERGE_ON_READ");
confMap.put("path" , "hdfs://127.0.0.1:9000/hudi/hudi_db/mor_test9");
confMap.put("read.streaming.enabled" , "true");
confMap.put("read.streaming.check-interval" , "4");
DataStream<Row> rowDataStream = SourceBuilder
.builder()
.env(getEnv())
.schema(CREATE_TABLE_SCHEMA)
.options(confMap)
.partitions(Arrays.asList("dt", "hr"))
.build();
rowDataStream.print();
{code}
write eg:
{code}
DataStream<Row> input = dataStreamGen();
Map<String, String> confMap = new HashMap<>();
confMap.put("connector" , "hudi");
confMap.put("table.type", "MERGE_ON_READ");
confMap.put("path" , "hdfs://127.0.0.1:9000/hudi/hudi_db/mor_test9");
confMap.put("hive_sync.enable" , "true");
confMap.put("hive_sync.table" , "mor_test9");
confMap.put("hive_sync.db" , "hudi_db");
confMap.put("hive_sync.username" , "hoodie");
confMap.put("hive_sync.password" , "hoodie");
confMap.put("hive_sync.mode" , "hms");
confMap.put("hive_sync.metastore.uris" , "thrift://localhost:9083");
SinkBuilder.builder()
.input(input)
.options(confMap)
.partitions(Arrays.asList("dt", "hr"))
.schema(CREATE_TABLE_SCHEMA)
.build();
{code}
> Flink Hudi module should support low-level read and write APIs
> ---------------------------------------------------------------
>
> Key: HUDI-3953
> URL: https://issues.apache.org/jira/browse/HUDI-3953
> Project: Apache Hudi
> Issue Type: Improvement
> Reporter: yuemeng
> Priority: Major
> Labels: pull-request-available
>
> Currently. Flink Hudi Module only supports SQL APIs. People who want to use
> low-level APIs such used for operating Flink state or another purpose don't
> have a friendly way.
> It can be provided a low-level APIs for users to write/read hoodie data
> the API design and main change will be:
> # add sink and source API in Pipelines
> # getSinkRuntimeProvider in HoodieTableSink call Pipelines.sink(...) to
> return DataStreamSink
> # getScanRuntimeProvider in HoodieTableSource call Pipelines.source() to
> return DataStream
> # move some common methods such as getInputFormat in util class
> # low-level API such as read and write just call Pipelines.sink(...) and
> Pipelines.source()
--
This message was sent by Atlassian Jira
(v8.20.7#820007)