danny0405 commented on code in PR #12697: URL: https://github.com/apache/hudi/pull/12697#discussion_r1934943752
########## rfc/rfc-84/rfc-84.md: ########## @@ -24,8 +24,113 @@ ## Approvers - @danny0405 -- @... +- @xiarixiaoyao +- @yuzhaojing -## Status: Claim +## Status + +Design is under discussion. JIRA: [HUDI-8799](https://issues.apache.org/jira/browse/HUDI-8799) + +## Abstract + +Currently, in the majority of scenarios when Flink writes into Hudi, the first step is row data conversion into Avro record, which is used for key generation, +and passed to the following operators as part of `HoodieRecord`. Kryo serializer is used to serialize/deserialize those records. +And as it mentioned in the [claim for this RFC](https://github.com/apache/hudi/pull/12550), Kryo serde costs are huge, which is unacceptable for stream processing. + +This RFC suggests to implement data processing with keeping focus on performance for Flink, and considering Flink's internal data types and serialization. + +## Background + +Currently, `HoodieRecord` is chosen as standardized API for interacting with a single record, see [RFC-46](../rfc-46/rfc-46.md). +But `HoodieRecord` complex structure leads to high serialization/deserialization costs if it needed to be sent. +So for Flink main use case scenario of stream processing, when we handle each record separately on different operators, +current approach with initial conversion into `HoodieRecord` becomes unacceptable. + +Conversion into `HoodieRecord` should be done only in operators, which actually perform write into HDFS, S3, etc., to prevent excessive costs. +And also it allows to implement future optimizations with direct write of Flink `RowData` to parquet files if it needed without any intermediate conversions. +In Flink pipelines we could keep internal `RowData` together with only necessary Hudi metadata. +And these metadata should be added considering Flink data types. + +There are seven different categories of +[supported data types](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/serialization/types_serialization/) +in Flink: tuples, POJOs, primitive types, regular classes, values, Hadoop writables, and special types. +Among presented data types, tuples are less flexible, but offer the best performance. +In this case there is no need in custom type description and custom serializer, and we could use already presented in Flink +[`TupleTypeInfo`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java) +and +[`TupleSerializer`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java). +(All links to Flink documentation or code are provided for version 1.20.) + +## Implementation + +To prepare implementation plan we should start from current state review. + +### Write + +All scenarios of Flink writes into Hudi could be presented in a schema below: + + + +There are two special cases for processing: bulk insert, and append mode (insert operation into MOR and COW without inline clustering), which are seen in the lower part. +For both of these cases there is no conversion of `RowData` into `HoodieRecord`, so our main focus should be on the upper part of the presented schema. + +Flink could automatically chain operators together if it's possible, which means that one operator combines multiple transformations. +For chained operator there is no serialization/deserialization between combined transformations. +But transformations separated by some partitioners, like +[`keyBy()`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L304-L308), +which uses +[`KeyGroupStreamPartitioner`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java), +or +[`partitionCustom()`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L398-L402), +which expects user defined custom partitioner, couldn't be chained. +Those partitioners are marked as purple blocks in the schema, and at those places we will face high serialization/deserialization costs. + +### Read + +As for reading of Hudi table by Flink, at the first blush there is no need to implement such optimizations there. + +### Metadata + +We start writing into Hudi table from `DataStream<RowData>`. +Necessary for processing Hudi metadata is marked by red color on the schema above. +We could use +[`map()`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L611-L614) +transformation to convert incoming `RowData` into `Tuple(Tuple(StringData, ..., StringData), RowData)` +instead of `HoodieRecord`, where the first sub-tuple will contain necessary metadata. +In this case return type could be described as `TupleTypeInfo<>(TupleTypeInfo<>(...), dataStream.getType()))`, which will force Flink to use internal `TupleSerializer`. + +### Potential problems + +1. Key generators are hardly coupled with Avro `GenericRecord`. + Therefore, to support all key generators we will have to do intermediate conversion into Avro in operator, that is responsible for getting Hudi key. Review Comment: @cshuo is recently working on a new RFC to add basic abstractions of schema/data type/expressions to Hudi, so that we can integrate with the engine specific "row" for both the writer and reader, the design doc would be coming out, will cc you if you have intreast in it, it's a huge task and maybe you can help with it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
