geserdugarov commented on code in PR #12697:
URL: https://github.com/apache/hudi/pull/12697#discussion_r1960099516


##########
rfc/rfc-84/rfc-84.md:
##########
@@ -24,8 +24,335 @@
 ## Approvers
 
 - @danny0405
-- @...
+- @xiarixiaoyao
+- @yuzhaojing
+- @wombatu-kun
 
-## Status: Claim
+## Status
+
+Design is under discussion.
+
+Implementation is ready for non bucket and simple bucket, and wait for review:
+https://github.com/apache/hudi/pull/12796
 
 JIRA: [HUDI-8799](https://issues.apache.org/jira/browse/HUDI-8799)
+
+## Abstract
+
+Currently, in the majority of scenarios when Flink writes into Hudi, the first 
step is row data conversion into Avro record, which is used for key generation, 
+and passed to the following operators as part of `HoodieRecord`. Kryo 
serializer is used to serialize/deserialize those records. 
+And as it mentioned in the [claim for this 
RFC](https://github.com/apache/hudi/pull/12550), Kryo serde costs are huge, 
which is unacceptable for stream processing.
+
+This RFC suggests to implement data processing with keeping focus on 
performance for Flink, and considering Flink's internal data types and 
serialization.
+
+## Background
+
+Currently, `HoodieRecord` is chosen as standardized API for interacting with a 
single record, see [RFC-46](../rfc-46/rfc-46.md). 
+But `HoodieRecord` complex structure leads to high 
serialization/deserialization costs if it needed to be sent.
+So for Flink main use case scenario of stream processing, when we handle each 
record separately on different operators, 
+current approach with initial conversion into `HoodieRecord` becomes 
unacceptable.
+
+Conversion into `HoodieRecord` should be done only in operators, which 
actually perform write into HDFS, S3, etc., to prevent excessive costs. 
+And also it allows to implement future optimizations with direct write of 
Flink `RowData` to parquet files if it needed without any intermediate 
conversions.
+In Flink pipelines we could keep internal `RowData` together with only 
necessary Hudi metadata.
+And these metadata should be added considering Flink data types.
+
+There are seven different categories of 
+[supported data 
types](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/serialization/types_serialization/)
 
+in Flink: tuples, POJOs, primitive types, regular classes, values, Hadoop 
writables, and special types. 
+Among presented data types, tuples are less flexible, but offer the best 
performance. 
+In this case there is no need in custom type description and custom 
serializer, and we could use already presented in Flink 
+[`TupleTypeInfo`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java)
 
+and 
+[`TupleSerializer`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java).
+(All links to Flink documentation or code are provided for version 1.20.) 
+
+## Implementation
+
+To prepare implementation plan we should start from current state review.
+
+### Write
+
+All scenarios of Flink writes into Hudi could be presented in a schema below:
+
+![`DataStream` for `HoodieTableSink`](datastream_hoodietablesink.png)
+
+There are two special cases for processing: bulk insert, and append mode 
(insert operation into MOR and COW without inline clustering), which are seen 
in the lower part.
+For both of these cases there is no conversion of `RowData` into 
`HoodieRecord`, so our main focus should be on the upper part of the presented 
schema.
+
+Flink could automatically chain operators together if it's possible, which 
means that one operator combines multiple transformations.
+For chained operator there is no serialization/deserialization between 
combined transformations.
+But transformations separated by some partitioners, like 
+[`keyBy()`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L304-L308),
 
+which uses 
+[`KeyGroupStreamPartitioner`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java),
 
+or 
+[`partitionCustom()`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L398-L402),
 
+which expects user defined custom partitioner, couldn't be chained.
+Those partitioners are marked as purple blocks in the schema, and at those 
places we will face high serialization/deserialization costs.
+
+### Read
+
+As for reading of Hudi table by Flink, at the first blush there is no need to 
implement such optimizations there.
+
+### Metadata
+
+We start writing into Hudi table from `DataStream<RowData>`.
+Necessary for processing Hudi metadata is marked by red color on the schema 
above.
+We could use 
+[`map()`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L611-L614)
+transformation to convert incoming `RowData` into a new object 
`HoodieFlinkInternalRow`.
+We don't use `HoodieFlinkRecord` name to prevent confusion because this class 
doesn't extend `HoodieRecord`.
+
+Structure of `HoodieFlinkInternalRow`:

Review Comment:
   Unfortunately, I don't see corresponding commit here. Looks like git cleaner 
runs often here. And in my local history it's a tough task to find it after all 
rebases, squashes, and resets. I didn't save it due to the fact that it was a 
dead end for me. And I've started to rewrite everything.
   
   To reproduce this issue, I suppose, it should be enough to construct custom 
pipeline from Flink operators. Convert in the first one to some data structure 
for testing, add `keyBy` shuffling using some data from new data structure, and 
then write data stream somewhere, or, maybe, black hole could be used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to