geserdugarov commented on code in PR #12697:
URL: https://github.com/apache/hudi/pull/12697#discussion_r1971334857


##########
rfc/rfc-84/rfc-84.md:
##########
@@ -24,8 +24,344 @@
 ## Approvers
 
 - @danny0405
-- @...
+- @cshuo
 
-## Status: Claim
+## Status
 
-JIRA: [HUDI-8799](https://issues.apache.org/jira/browse/HUDI-8799)
+Main part of implementation is done:
+https://github.com/apache/hudi/pull/12796
+
+Umbrella ticket: [HUDI-8920](https://issues.apache.org/jira/browse/HUDI-8920)
+
+## Abstract
+
+Currently, in the majority of scenarios when Flink writes into Hudi, the first 
step is row data conversion into Avro record, which is used for key generation, 
+and passed to the following operators as part of `HoodieRecord`. Kryo 
serializer is used to serialize/deserialize those records. 
+And as it mentioned in the [claim for this 
RFC](https://github.com/apache/hudi/pull/12550), Kryo serde costs are huge, 
which is unacceptable for stream processing.
+
+This RFC suggests to implement data processing with keeping focus on 
performance for Flink, and considering Flink's internal data types and 
serialization.
+
+## Background
+
+Currently, `HoodieRecord` is chosen as standardized API for interacting with a 
single record, see [RFC-46](../rfc-46/rfc-46.md). 
+But `HoodieRecord` complex structure leads to high 
serialization/deserialization costs if it needed to be sent.
+So for Flink main use case scenario of stream processing, when we handle each 
record separately on different operators, 
+current approach with initial conversion into `HoodieRecord` becomes 
unacceptable.
+
+Conversion into `HoodieRecord` should be done only in operators, which 
actually perform write into HDFS, S3, etc., to prevent excessive costs. 
+And also it allows to implement future optimizations with direct write of 
Flink `RowData` to parquet files if it is needed without any intermediate 
conversions.
+In Flink pipelines we could keep internal `RowData` together with only 
necessary Hudi metadata.
+And these metadata should be added considering Flink data types.
+
+## Implementation
+
+To prepare implementation plan we should start from current state review.
+
+### Write
+
+All scenarios of Flink writes into Hudi could be presented in a schema below:
+
+![`DataStream` for `HoodieTableSink`](datastream_hoodietablesink.png)
+
+There are two special cases for processing: bulk insert, and append mode 
(insert operation into MOR and COW without inline clustering), which are seen 
in the lower part.
+For both of these cases there is no conversion of `RowData` into 
`HoodieRecord`.
+Also, Flink could automatically chain operators together if it's possible, 
which means that one operator combines multiple transformations,
+and there are no serialization/deserialization between these transformations.
+Therefore, our main focus should be on the upper part of the presented schema 
due to the fact that for bulk insert and append mode chained Flink operators 
are used.
+
+Transformations separated by some partitioners, like 
+[`keyBy()`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L304-L308),
 
+which uses 
+[`KeyGroupStreamPartitioner`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java),
 
+or 
+[`partitionCustom()`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L398-L402),
 
+which expects user defined custom partitioner, couldn't been chained.
+Those partitioners are marked as purple blocks in the schema above, and at 
those places we will face high serialization/deserialization costs.
+
+### Read
+
+As for reading of Hudi table by Flink, at the first blush there is no need to 
implement such optimizations there.
+
+### Metadata
+
+We start writing into Hudi table from `DataStream<RowData>`.
+Hudi metadata, which is necessary for processing in different operators, is 
marked by red color on the schema above.
+We could use 
+[`map()`](https://github.com/apache/flink/blob/b1fe7b4099497f02b4658df7c3de8e45b62b7e21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L611-L614)
+transformation to convert incoming `RowData` into a new object 
`HoodieFlinkInternalRow`.
+We don't use `HoodieFlinkRecord` name to prevent confusion, because 
`HoodieFlinkInternalRow` doesn't extend `HoodieRecord`.
+
+Structure of `HoodieFlinkInternalRow`:
+```Java
+public class HoodieFlinkInternalRow implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  // the number of fields without nesting
+  protected static final int ARITY = 7;
+
+  // recordKey, partitionPath, isIndexRecord and rowData are final
+  private final StringData recordKey;
+  private final StringData partitionPath;
+  private StringData fileId;
+  private StringData instantTime;
+  private StringData operationType;
+  // there is no rowData for index record
+  private final BooleanValue isIndexRecord;
+  private final RowData rowData;
+
+  public HoodieFlinkInternalRow(String recordKey, String partitionPath, 
RowData rowData) {
+    this(recordKey, partitionPath, "", "", "", false, rowData);
+  }
+
+  // constructor for index records without row data
+  public HoodieFlinkInternalRow(String recordKey, String partitionPath, String 
fileId, String instantTime) {
+    this(recordKey, partitionPath, fileId, instantTime, "", true, null);
+  }
+
+  public HoodieFlinkInternalRow(String recordKey,
+                                String partitionPath,
+                                String fileId,
+                                String instantTime,
+                                String operationType,
+                                boolean isIndexRecord,
+                                RowData rowData) {
+    this.recordKey = StringData.fromString(recordKey);
+    this.partitionPath = StringData.fromString(partitionPath);
+    this.fileId = StringData.fromString(fileId);
+    this.instantTime = StringData.fromString(instantTime);
+    this.operationType = StringData.fromString(operationType);
+    this.isIndexRecord = new BooleanValue(isIndexRecord);
+    this.rowData = rowData;
+  }
+
+  public String getRecordKey() {}
+
+  public String getPartitionPath() {}
+
+  public void setFileId(String fileId) {}
+  public String getFileId() {}
+
+  public void setInstantTime(String instantTime) {}
+  public String getInstantTime() {}
+
+  public void setOperationType(String operationType) {}
+  public String getOperationType() {}
+
+  public boolean isIndexRecord() {}
+
+  public RowData getRowData() {}
+
+  public HoodieFlinkInternalRow copy(RowDataSerializer rowDataSerializer) {
+    return new HoodieFlinkInternalRow(
+        this.recordKey.toString(),
+        this.partitionPath.toString(),
+        this.fileId.toString(),
+        this.instantTime.toString(),
+        this.operationType.toString(),
+        this.isIndexRecord.getValue(),
+        rowDataSerializer.copy(this.rowData));
+  }
+}
+```
+
+To describe how to serialize and deserialize it properly, we also need to 
implement `HoodieFlinkInternalRowTypeInfo` and 
`HoodieFlinkInternalRowSerializer`.
+
+```Java
+public class HoodieFlinkInternalRowTypeInfo extends 
TypeInformation<HoodieFlinkInternalRow> {
+
+  private static final long serialVersionUID = 1L;
+
+  private final RowType rowType;
+
+  public HoodieFlinkInternalRowTypeInfo(RowType rowType) {
+    this.rowType = rowType;
+  }
+
+  @Override
+  public boolean isBasicType() {}
+
+  @Override
+  public boolean isTupleType() {}
+
+  @Override
+  public int getArity() {
+    return HoodieFlinkInternalRow.ARITY;
+  }
+
+  /**
+   * Used only in Flink `CompositeType`, not used in this type
+   */
+  @Override
+  public int getTotalFields() {
+    return HoodieFlinkInternalRow.ARITY;
+  }
+
+  @Override
+  public Class<HoodieFlinkInternalRow> getTypeClass() {}
+
+  @Override
+  public boolean isKeyType() {}
+
+  @Override
+  public TypeSerializer<HoodieFlinkInternalRow> 
createSerializer(ExecutionConfig config) {
+    return new HoodieFlinkInternalRowSerializer(this.rowType);
+  }
+
+  @Override
+  public String toString() {}
+
+  @Override
+  public boolean equals(Object obj) {}
+
+  @Override
+  public int hashCode() {}
+
+  @Override
+  public boolean canEqual(Object obj) {}
+}
+```
+
+```Java
+public class HoodieFlinkInternalRowSerializer extends 
TypeSerializer<HoodieFlinkInternalRow> {
+
+  private static final long serialVersionUID = 1L;
+
+  protected RowType rowType;
+
+  protected RowDataSerializer rowDataSerializer;
+
+  protected StringDataSerializer stringDataSerializer;
+
+  public HoodieFlinkInternalRowSerializer(RowType rowType) {
+    this.rowType = rowType;
+    this.rowDataSerializer = new RowDataSerializer(rowType);
+    this.stringDataSerializer = StringDataSerializer.INSTANCE;
+  }
+
+  @Override
+  public boolean isImmutableType() {}
+
+  @Override
+  public TypeSerializer<HoodieFlinkInternalRow> duplicate() {}
+
+  @Override
+  public HoodieFlinkInternalRow createInstance() {
+    throw new UnsupportedOperationException("HoodieFlinkInternalRow doesn't 
allow creation with some defaults.");
+  }
+
+  @Override
+  public HoodieFlinkInternalRow copy(HoodieFlinkInternalRow from) {
+    return from.copy(rowDataSerializer);
+  }
+
+  @Override
+  public HoodieFlinkInternalRow copy(HoodieFlinkInternalRow from, 
HoodieFlinkInternalRow reuse) {
+    throw new UnsupportedOperationException("HoodieFlinkInternalRow doesn't 
allow reusing.");
+  }
+
+  @Override
+  public int getLength() {
+    // -1 for variable length data types
+    return -1;
+  }
+
+  @Override
+  public void serialize(HoodieFlinkInternalRow record, DataOutputView target) 
throws IOException {
+    boolean isIndexRecord = record.isIndexRecord();
+    target.writeBoolean(isIndexRecord);
+    
stringDataSerializer.serialize(StringData.fromString(record.getRecordKey()), 
target);
+    
stringDataSerializer.serialize(StringData.fromString(record.getPartitionPath()),
 target);
+    stringDataSerializer.serialize(StringData.fromString(record.getFileId()), 
target);
+    
stringDataSerializer.serialize(StringData.fromString(record.getInstantTime()), 
target);
+    
stringDataSerializer.serialize(StringData.fromString(record.getOperationType()),
 target);
+    if (!isIndexRecord) {
+      rowDataSerializer.serialize(record.getRowData(), target);
+    }
+  }
+
+  @Override
+  public HoodieFlinkInternalRow deserialize(DataInputView source) throws 
IOException {
+    boolean isIndexRecord = source.readBoolean();
+    StringData recordKey = stringDataSerializer.deserialize(source);
+    StringData partition = stringDataSerializer.deserialize(source);
+    StringData fileId = stringDataSerializer.deserialize(source);
+    StringData instantTime = stringDataSerializer.deserialize(source);
+    StringData operationType = stringDataSerializer.deserialize(source);
+    HoodieFlinkInternalRow record;
+    if (!isIndexRecord) {
+      RowData rowData = rowDataSerializer.deserialize(source);
+      record = new HoodieFlinkInternalRow(
+          recordKey.toString(),

Review Comment:
   @cshuo , here you suggested to pass `StringData` and add corresponding 
constructor. My quick check showed performance decrease for this case, but I 
postponed research of it.
   
   I've done this research, and found that the reason we should do it is 
garbage collection. When we postpone creation of Java `String` objects in 
writers, it leads to move of huge number of objects (related to `StringData`) 
from young generation to old generation. Consequently, it causes more full GC 
runs, and decreases performance.
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to