geserdugarov commented on code in PR #12697:
URL: https://github.com/apache/hudi/pull/12697#discussion_r1932042819


##########
rfc/rfc-84/rfc-84.md:
##########
@@ -24,8 +24,113 @@
 ## Approvers
 
 - @danny0405
-- @...
+- @xiarixiaoyao
+- @yuzhaojing
 
-## Status: Claim
+## Status
+
+Design is under discussion.
 
 JIRA: [HUDI-8799](https://issues.apache.org/jira/browse/HUDI-8799)
+
+## Abstract
+
+Currently, in the majority of scenarios when Flink writes into Hudi, the first 
step is row data conversion into Avro record, which is used for key generation, 
+and passed to the following operators as part of `HoodieRecord`. Kryo 
serializer is used to serialize/deserialize those records. 
+And as it mentioned in the [claim for this 
RFC](https://github.com/apache/hudi/pull/12550), Kryo serde costs are huge, 
which is unacceptable for stream processing.
+
+This RFC suggests to implement data processing with keeping focus on 
performance for Flink, and considering Flink's internal data types and 
serialization.
+
+## Background
+
+Currently, `HoodieRecord` is chosen as standardized API for interacting with a 
single record, see [RFC-46](../rfc-46/rfc-46.md). 
+But `HoodieRecord` complex structure leads to high 
serialization/deserialization costs if it needed to be sent.
+So for Flink main use case scenario of stream processing, when we handle each 
record separately on different operators, 
+current approach with initial conversion into `HoodieRecord` becomes 
unacceptable.
+
+Conversion into `HoodieRecord` should be done only in operators, which 
actually perform write into HDFS, S3, etc., to prevent excessive costs. 
+And also it allows to implement future optimizations with direct write of 
Flink `RowData` to parquet files if it needed without any intermediate 
conversions.
+In Flink pipelines we could keep internal `RowData` together with only 
necessary Hudi metadata.
+And these metadata should be added considering Flink data types.
+
+There are seven different categories of 
+[supported data 
types](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/serialization/types_serialization/)
 
+in Flink: tuples, POJOs, primitive types, regular classes, values, Hadoop 
writables, and special types. 
+Among presented data types, tuples are less flexible, but offer the best 
performance. 
+In this case there is no need in custom type description and custom 
serializer, and we could use already presented in Flink 
+[`TupleTypeInfo`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java)
 
+and 
+[`TupleSerializer`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java).
+(All links to Flink documentation or code are provided for version 1.20.) 
+
+## Implementation
+
+To prepare implementation plan we should start from current state review.
+
+### Write
+
+All scenarios of Flink writes into Hudi could be presented in a schema below:
+
+![`DataStream` for `HoodieTableSink`](datastream_hoodietablesink.png)
+
+There are two special cases for processing: bulk insert, and append mode 
(insert operation into MOR and COW without inline clustering), which are seen 
in the lower part.
+For both of these cases there is no conversion of `RowData` into 
`HoodieRecord`, so our main focus should be on the upper part of the presented 
schema.
+
+Flink could automatically chain operators together if it's possible, which 
means that one operator combines multiple transformations.
+For chained operator there is no serialization/deserialization between 
combined transformations.
+But transformations separated by some partitioners, like 
+[`keyBy()`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L304-L308),
 
+which uses 
+[`KeyGroupStreamPartitioner`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java),
 
+or 
+[`partitionCustom()`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L398-L402),
 
+which expects user defined custom partitioner, couldn't be chained.
+Those partitioners are marked as purple blocks in the schema, and at those 
places we will face high serialization/deserialization costs.
+
+### Read
+
+As for reading of Hudi table by Flink, at the first blush there is no need to 
implement such optimizations there.
+
+### Metadata
+
+We start writing into Hudi table from `DataStream<RowData>`.
+Necessary for processing Hudi metadata is marked by red color on the schema 
above.
+We could use 
+[`map()`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L611-L614)
+transformation to convert incoming `RowData` into `Tuple(Tuple(StringData, 
..., StringData), RowData)` 
+instead of `HoodieRecord`, where the first sub-tuple will contain necessary 
metadata. 
+In this case return type could be described as 
`TupleTypeInfo<>(TupleTypeInfo<>(...), dataStream.getType()))`, which will 
force Flink to use internal `TupleSerializer`.
+
+### Potential problems
+
+1. Key generators are hardly coupled with Avro `GenericRecord`. 
+   Therefore, to support all key generators we will have to do intermediate 
conversion into Avro in operator, that is responsible for getting Hudi key.

Review Comment:
   @danny0405 , PR is ready: https://github.com/apache/hudi/pull/12722.
   In the description to this PR, I've described costs from this extra 
conversion:
   > These costs could be accepted for now due to acceptable values: 5 798 CPU 
samples from 183 236 in total, which is about 3%.
   
   Total performance improvement:
   - total write time decreased from 344 s to 265 s, which is about 23%,
   - data passed between Flink operators decreased from 19.4 GB to 12.9 GB, 
which is about 33.5%.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to