alexeykudinkin commented on code in PR #6132:
URL: https://github.com/apache/hudi/pull/6132#discussion_r938175507


##########
rfc/rfc-46/rfc-46.md:
##########
@@ -74,49 +74,94 @@ Following (high-level) steps are proposed:
    2. Split into interface and engine-specific implementations (holding 
internal engine-specific representation of the payload) 
    3. Implementing new standardized record-level APIs (like `getPartitionKey` 
, `getRecordKey`, etc)
    4. Staying **internal** component, that will **NOT** contain any 
user-defined semantic (like merging)
-2. Extract Record Combining (Merge) API from `HoodieRecordPayload` into a 
standalone, stateless component (engine). Such component will be
+2. Extract Record Merge API from `HoodieRecordPayload` into a standalone, 
stateless component. Such component will be
    1. Abstracted as stateless object providing API to combine records 
(according to predefined semantics) for engines (Spark, Flink) of interest
    2. Plug-in point for user-defined combination semantics
 3. Gradually deprecate, phase-out and eventually remove `HoodieRecordPayload` 
abstraction
 
 Phasing out usage of `HoodieRecordPayload` will also bring the benefit of 
avoiding to use Java reflection in the hot-path, which
 is known to have poor performance (compared to non-reflection based 
instantiation).
 
-#### Combine API Engine
+#### Record Merge API
 
-Stateless component interface providing for API Combining Records will look 
like following:
+CombineAndGetUpdateValue and Precombine will converge to one API. Stateless 
component interface providing for API Combining Records will look like 
following:
 
 ```java
-interface HoodieRecordCombiningEngine {
-  
-  default HoodieRecord precombine(HoodieRecord older, HoodieRecord newer) {
-    if (spark) {
-      precombineSpark((SparkHoodieRecord) older, (SparkHoodieRecord) newer);
-    } else if (flink) {
-      // precombine for Flink
-    }
-  }
+interface HoodieRecordMerger {
+   // This method converges combineAndGetUpdateValue and precombine from 
HoodiePayload. 
+   // It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we 
can translate as having 3 versions A, B, C of the single record, both orders of 
operations applications have to yield the same result)
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema 
schema, Properties props) throws IOException;
+   
+   // The record type handled by the current merger
+   // SPARK, AVRO, FLINK
+   HoodieRecordType getRecordType();
+}
 
-   /**
-    * Spark-specific implementation 
-    */
-  SparkHoodieRecord precombineSpark(SparkHoodieRecord older, SparkHoodieRecord 
newer);
-  
-  // ...
+/**
+ * Spark-specific implementation 
+ */
+class HoodieSparkRecordMerger implements HoodieRecordMerger {

Review Comment:
   Please check my comment in here:
   https://github.com/apache/hudi/pull/5629#discussion_r938154747
   
   We should allow user to implement single `RecordMerger` object supporting 
every engine type



##########
rfc/rfc-46/rfc-46.md:
##########
@@ -128,21 +173,88 @@ Following major components will be refactored:
 
 1. `HoodieWriteHandle`s will be  
    1. Accepting `HoodieRecord` instead of raw Avro payload (avoiding Avro 
conversion)
-   2. Using Combining API engine to merge records (when necessary) 
+   2. Using Record Merge API to merge records (when necessary) 
    3. Passes `HoodieRecord` as is to `FileWriter`
 2. `HoodieFileWriter`s will be 
    1. Accepting `HoodieRecord`
    2. Will be engine-specific (so that they're able to handle internal record 
representation)
 3. `HoodieRealtimeRecordReader`s 
    1. API will be returning opaque `HoodieRecord` instead of raw Avro payload
 
+### Config for Record Merge
+The MERGE_CLASS_NAME config is engine-aware. If you are not specified the 
MERGE_CLASS_NAME, MERGE_CLASS_NAME will be specified default according to your 
engine type.
+
+### Public Api in HoodieRecord
+Because we implement different types of records, we need to implement 
functionality similar to AvroUtils in HoodieRecord for different data(avro, 
InternalRow, RowData).
+Its public API will look like following:
+
+```java
+import java.util.Properties;
+
+class HoodieRecord {
+
+   /**
+    * Get column in record to support RDDCustomColumnsSortPartitioner
+    */
+   Object getRecordColumnValues(Schema recordSchema, String[] columns,

Review Comment:
   This should return an array of objects, right?



##########
rfc/rfc-46/rfc-46.md:
##########
@@ -128,21 +173,88 @@ Following major components will be refactored:
 
 1. `HoodieWriteHandle`s will be  
    1. Accepting `HoodieRecord` instead of raw Avro payload (avoiding Avro 
conversion)
-   2. Using Combining API engine to merge records (when necessary) 
+   2. Using Record Merge API to merge records (when necessary) 
    3. Passes `HoodieRecord` as is to `FileWriter`
 2. `HoodieFileWriter`s will be 
    1. Accepting `HoodieRecord`
    2. Will be engine-specific (so that they're able to handle internal record 
representation)
 3. `HoodieRealtimeRecordReader`s 
    1. API will be returning opaque `HoodieRecord` instead of raw Avro payload
 
+### Config for Record Merge
+The MERGE_CLASS_NAME config is engine-aware. If you are not specified the 
MERGE_CLASS_NAME, MERGE_CLASS_NAME will be specified default according to your 
engine type.
+
+### Public Api in HoodieRecord
+Because we implement different types of records, we need to implement 
functionality similar to AvroUtils in HoodieRecord for different data(avro, 
InternalRow, RowData).
+Its public API will look like following:
+
+```java
+import java.util.Properties;
+
+class HoodieRecord {
+
+   /**
+    * Get column in record to support RDDCustomColumnsSortPartitioner
+    */
+   Object getRecordColumnValues(Schema recordSchema, String[] columns,
+           boolean consistentLogicalTimestampEnabled);
+
+   /**
+    * Support bootstrap.
+    */
+   HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws 
IOException;
+
+   /**
+    * Rewrite record into new schema(add meta columns)
+    */
+   HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema 
targetSchema)
+           throws IOException;
+
+   /**
+    * Support schema evolution.
+    */
+   HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties 
props, Schema newSchema,
+           Map<String, String> renameCols) throws IOException;
+
+   HoodieRecord updateValues(Schema recordSchema, Properties props,

Review Comment:
   I'd suggest to
   
   1. Name it `updateMetadataValues` to avoid confusion (we shouldn't be 
allowing to modify the record's payload)
   2. Instead of `Map<Sstring, String>` let's create a strongly typed Java 
class w/ all meta-fields and pass it here



##########
rfc/rfc-46/rfc-46.md:
##########
@@ -128,21 +173,88 @@ Following major components will be refactored:
 
 1. `HoodieWriteHandle`s will be  
    1. Accepting `HoodieRecord` instead of raw Avro payload (avoiding Avro 
conversion)
-   2. Using Combining API engine to merge records (when necessary) 
+   2. Using Record Merge API to merge records (when necessary) 
    3. Passes `HoodieRecord` as is to `FileWriter`
 2. `HoodieFileWriter`s will be 
    1. Accepting `HoodieRecord`
    2. Will be engine-specific (so that they're able to handle internal record 
representation)
 3. `HoodieRealtimeRecordReader`s 
    1. API will be returning opaque `HoodieRecord` instead of raw Avro payload
 
+### Config for Record Merge
+The MERGE_CLASS_NAME config is engine-aware. If you are not specified the 
MERGE_CLASS_NAME, MERGE_CLASS_NAME will be specified default according to your 
engine type.
+
+### Public Api in HoodieRecord
+Because we implement different types of records, we need to implement 
functionality similar to AvroUtils in HoodieRecord for different data(avro, 
InternalRow, RowData).
+Its public API will look like following:
+
+```java
+import java.util.Properties;
+
+class HoodieRecord {
+
+   /**
+    * Get column in record to support RDDCustomColumnsSortPartitioner
+    */
+   Object getRecordColumnValues(Schema recordSchema, String[] columns,
+           boolean consistentLogicalTimestampEnabled);
+
+   /**
+    * Support bootstrap.
+    */
+   HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws 
IOException;

Review Comment:
   As we've discussed prior we should avoid adding any merging semantic to the 
Record API itself. What this method is going to be used for?



##########
rfc/rfc-46/rfc-46.md:
##########
@@ -128,21 +173,88 @@ Following major components will be refactored:
 
 1. `HoodieWriteHandle`s will be  
    1. Accepting `HoodieRecord` instead of raw Avro payload (avoiding Avro 
conversion)
-   2. Using Combining API engine to merge records (when necessary) 
+   2. Using Record Merge API to merge records (when necessary) 
    3. Passes `HoodieRecord` as is to `FileWriter`
 2. `HoodieFileWriter`s will be 
    1. Accepting `HoodieRecord`
    2. Will be engine-specific (so that they're able to handle internal record 
representation)
 3. `HoodieRealtimeRecordReader`s 
    1. API will be returning opaque `HoodieRecord` instead of raw Avro payload
 
+### Config for Record Merge
+The MERGE_CLASS_NAME config is engine-aware. If you are not specified the 
MERGE_CLASS_NAME, MERGE_CLASS_NAME will be specified default according to your 
engine type.
+
+### Public Api in HoodieRecord
+Because we implement different types of records, we need to implement 
functionality similar to AvroUtils in HoodieRecord for different data(avro, 
InternalRow, RowData).
+Its public API will look like following:
+
+```java
+import java.util.Properties;
+
+class HoodieRecord {
+
+   /**
+    * Get column in record to support RDDCustomColumnsSortPartitioner
+    */
+   Object getRecordColumnValues(Schema recordSchema, String[] columns,
+           boolean consistentLogicalTimestampEnabled);
+
+   /**
+    * Support bootstrap.
+    */
+   HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws 
IOException;
+
+   /**
+    * Rewrite record into new schema(add meta columns)
+    */
+   HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema 
targetSchema)
+           throws IOException;
+
+   /**
+    * Support schema evolution.
+    */
+   HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties 
props, Schema newSchema,
+           Map<String, String> renameCols) throws IOException;
+
+   HoodieRecord updateValues(Schema recordSchema, Properties props,
+           Map<String, String> metadataValues) throws IOException;
+
+   boolean isDelete(Schema recordSchema, Properties props) throws IOException;
+
+   /**
+    * Is EmptyRecord. Generated by ExpressionPayload.
+    */
+   boolean shouldIgnore(Schema recordSchema, Properties props) throws 
IOException;
+
+   /**
+    * This method used to extract HoodieKey through parameters.
+    */
+   HoodieRecord getKeyWithParams(

Review Comment:
   Great progress on this method! 
   
   It's arguments still don't make sense though from the user perspective: i 
can't imagine looking at it where it could be even used. Can you please point 
me to where this method is planned to be used?



##########
rfc/rfc-46/rfc-46.md:
##########
@@ -128,21 +173,88 @@ Following major components will be refactored:
 
 1. `HoodieWriteHandle`s will be  
    1. Accepting `HoodieRecord` instead of raw Avro payload (avoiding Avro 
conversion)
-   2. Using Combining API engine to merge records (when necessary) 
+   2. Using Record Merge API to merge records (when necessary) 
    3. Passes `HoodieRecord` as is to `FileWriter`
 2. `HoodieFileWriter`s will be 
    1. Accepting `HoodieRecord`
    2. Will be engine-specific (so that they're able to handle internal record 
representation)
 3. `HoodieRealtimeRecordReader`s 
    1. API will be returning opaque `HoodieRecord` instead of raw Avro payload
 
+### Config for Record Merge
+The MERGE_CLASS_NAME config is engine-aware. If you are not specified the 
MERGE_CLASS_NAME, MERGE_CLASS_NAME will be specified default according to your 
engine type.
+
+### Public Api in HoodieRecord
+Because we implement different types of records, we need to implement 
functionality similar to AvroUtils in HoodieRecord for different data(avro, 
InternalRow, RowData).
+Its public API will look like following:
+
+```java
+import java.util.Properties;
+
+class HoodieRecord {
+
+   /**
+    * Get column in record to support RDDCustomColumnsSortPartitioner
+    */
+   Object getRecordColumnValues(Schema recordSchema, String[] columns,
+           boolean consistentLogicalTimestampEnabled);
+
+   /**
+    * Support bootstrap.
+    */
+   HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws 
IOException;
+
+   /**
+    * Rewrite record into new schema(add meta columns)
+    */
+   HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema 
targetSchema)
+           throws IOException;
+
+   /**
+    * Support schema evolution.
+    */
+   HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties 
props, Schema newSchema,
+           Map<String, String> renameCols) throws IOException;
+
+   HoodieRecord updateValues(Schema recordSchema, Properties props,
+           Map<String, String> metadataValues) throws IOException;
+
+   boolean isDelete(Schema recordSchema, Properties props) throws IOException;
+
+   /**
+    * Is EmptyRecord. Generated by ExpressionPayload.
+    */
+   boolean shouldIgnore(Schema recordSchema, Properties props) throws 
IOException;
+
+   /**
+    * This method used to extract HoodieKey through parameters.
+    */
+   HoodieRecord getKeyWithParams(
+           Schema schema,
+           Properties props,
+           Option<Pair<String, String>>simpleKeyGenFieldsOpt,
+           Boolean withOperation,
+           Option<String> partitionNameOp,
+           Boolean populateMetaFieldsOp) throws IOException;
+
+   /**
+    * This method used to extract HoodieKey through keyGenerator. This method 
used in ClusteringExecutionStrategy.
+    */
+   HoodieRecord getKeyWithKeyGen(Properties props, Option<BaseKeyGenerator> 
keyGen);
+
+   Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties 
props)

Review Comment:
   Why do we need `HoodieAvroIndexedRecord`? 
   I think `HoodieAvroRecord` should be enough



##########
rfc/rfc-46/rfc-46.md:
##########
@@ -156,13 +187,76 @@ Following major components will be refactored:
 3. `HoodieRealtimeRecordReader`s 
    1. API will be returning opaque `HoodieRecord` instead of raw Avro payload
 
+### Config for Record Merge
+The MERGE_CLASS_NAME config is engine-aware. If you are not specified the 
MERGE_CLASS_NAME, MERGE_CLASS_NAME will be specified default according to your 
engine type.
+
+### Public Api in HoodieRecord
+Because we implement different types of records, we need to transfer some func 
in AvroUtils into HoodieRecord for different data(avro, InternalRow, RowData).
+Its public API will look like following:
+
+```java
+class HoodieRecord {
+
+   /**
+    * Get column in record to support RDDCustomColumnsSortPartitioner
+    */
+   Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean 
consistentLogicalTimestampEnabled);
+
+   /**
+    * Support bootstrap.
+    */
+   HoodieRecord mergeWith(HoodieRecord other) throws IOException;
+
+   /**
+    * Rewrite record into new schema(add meta columns)
+    */
+   HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema 
targetSchema) throws IOException;
+
+   /**
+    * Support schema evolution.
+    */
+   HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties 
props, Schema newSchema, Map<String, String> renameCols) throws IOException;
+
+   HoodieRecord addMetadataValues(Schema recordSchema, Properties props, 
Map<HoodieMetadataField, String> metadataValues) throws IOException;
+
+   /**
+    * Is deleted.
+    */
+   boolean isPresent(Schema recordSchema, Properties props) throws IOException;
+
+   /**
+    * Is EmptyRecord. Generated by ExpressionPayload.
+    */
+   boolean shouldIgnore(Schema recordSchema, Properties props) throws 
IOException;

Review Comment:
   We should probably update the java-doc them to avoid ref to any particular 
implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to