yuemeng created HUDI-3953:
-----------------------------
Summary: Flink Hudi module should support low-level read and
write APIs
Key: HUDI-3953
URL: https://issues.apache.org/jira/browse/HUDI-3953
Project: Apache Hudi
Issue Type: Improvement
Reporter: yuemeng
Currently. Flink Hudi Module only supports SQL APIs. People who want to use
low-level APIs such used for operating Flink state or another purpose don't
have a friendly way.
It can be provided a low-level APIs for users to write/read hoodie data
Read code eg:
{code}
Map<String, String> confMap = new HashMap<>();
confMap.put("connector" , "hudi");
confMap.put("table.type", "MERGE_ON_READ");
confMap.put("path" , "hdfs://127.0.0.1:9000/hudi/hudi_db/mor_test9");
confMap.put("read.streaming.enabled" , "true");
confMap.put("read.streaming.check-interval" , "4");
DataStream<Row> rowDataStream = SourceBuilder
.builder()
.env(getEnv())
.schema(CREATE_TABLE_SCHEMA)
.options(confMap)
.partitions(Arrays.asList("dt", "hr"))
.build();
rowDataStream.print();
{code}
write eg:
{code}
DataStream<Row> input = dataStreamGen();
Map<String, String> confMap = new HashMap<>();
confMap.put("connector" , "hudi");
confMap.put("table.type", "MERGE_ON_READ");
confMap.put("path" , "hdfs://127.0.0.1:9000/hudi/hudi_db/mor_test9");
confMap.put("hive_sync.enable" , "true");
confMap.put("hive_sync.table" , "mor_test9");
confMap.put("hive_sync.db" , "hudi_db");
confMap.put("hive_sync.username" , "hoodie");
confMap.put("hive_sync.password" , "hoodie");
confMap.put("hive_sync.mode" , "hms");
confMap.put("hive_sync.metastore.uris" , "thrift://localhost:9083");
SinkBuilder.builder()
.input(input)
.options(confMap)
.partitions(Arrays.asList("dt", "hr"))
.schema(CREATE_TABLE_SCHEMA)
.build();
{code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)