alexeykudinkin commented on code in PR #6132:
URL: https://github.com/apache/hudi/pull/6132#discussion_r926263553
##########
rfc/rfc-46/rfc-46.md:
##########
@@ -156,13 +187,76 @@ Following major components will be refactored:
3. `HoodieRealtimeRecordReader`s
1. API will be returning opaque `HoodieRecord` instead of raw Avro payload
+### Config for Record Merge
+The MERGE_CLASS_NAME config is engine-aware. If you are not specified the
MERGE_CLASS_NAME, MERGE_CLASS_NAME will be specified default according to your
engine type.
+
+### Public Api in HoodieRecord
+Because we implement different types of records, we need to transfer some func
in AvroUtils into HoodieRecord for different data(avro, InternalRow, RowData).
+Its public API will look like following:
+
+```java
+class HoodieRecord {
+
+ /**
+ * Get column in record to support RDDCustomColumnsSortPartitioner
+ */
+ Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean
consistentLogicalTimestampEnabled);
+
+ /**
+ * Support bootstrap.
+ */
+ HoodieRecord mergeWith(HoodieRecord other) throws IOException;
+
+ /**
+ * Rewrite record into new schema(add meta columns)
+ */
+ HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema
targetSchema) throws IOException;
+
+ /**
+ * Support schema evolution.
+ */
+ HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties
props, Schema newSchema, Map<String, String> renameCols) throws IOException;
+
+ HoodieRecord addMetadataValues(Schema recordSchema, Properties props,
Map<HoodieMetadataField, String> metadataValues) throws IOException;
+
+ /**
+ * Is deleted.
+ */
+ boolean isPresent(Schema recordSchema, Properties props) throws IOException;
+
+ /**
+ * Is EmptyRecord. Generated by ExpressionPayload.
+ */
+ boolean shouldIgnore(Schema recordSchema, Properties props) throws
IOException;
+
+ /**
+ * This method used to extract HoodieKey not through keyGenerator.
+ */
+ HoodieRecord expansion(
Review Comment:
Please check my comment in the PR where it's introduced
(https://github.com/apache/hudi/pull/5629#discussion_r926262756), i don't think
we should have this method and i believe we should be able to get away without
it in any form.
##########
rfc/rfc-46/rfc-46.md:
##########
@@ -84,59 +84,90 @@ is known to have poor performance (compared to
non-reflection based instantiatio
#### Record Merge API
-Stateless component interface providing for API Combining Records will look
like following:
+CombineAndGetUpdateValue and Precombine will converge to one API. Stateless
component interface providing for API Combining Records will look like
following:
```java
-interface HoodieMerge {
- HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
-
- Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older,
HoodieRecord newer, Schema schema, Properties props) throws IOException;
+interface HoodieRecordMerger {
+ // combineAndGetUpdateValue and precombine
+ Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema
schema, Properties props) throws IOException;
+
+ // The record type handled by the current merger
+ // SPARK, AVRO, FLINK
+ HoodieRecordType getRecordType();
}
- /**
- * Spark-specific implementation
- */
- class HoodieSparkRecordMerge implements HoodieMerge {
+/**
+ * Spark-specific implementation
+ */
+class HoodieSparkRecordMerger implements HoodieRecordMerger {
- @Override
- public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
- // HoodieSparkRecords preCombine
- }
+ @Override
+ Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema
schema, Properties props) throws IOException {
+ // HoodieSparkRecord precombine and combineAndGetUpdateValue
+ }
- @Override
- public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older,
HoodieRecord newer, Schema schema, Properties props) {
- // HoodieSparkRecord combineAndGetUpdateValue
- }
+ @Override
+ HoodieRecordType getRecordType() {
+ return HoodieRecordType.SPARK;
}
+}
- /**
- * Flink-specific implementation
- */
- class HoodieFlinkRecordMerge implements HoodieMerge {
-
- @Override
- public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
- // HoodieFlinkRecord preCombine
- }
+/**
+ * Flink-specific implementation
+ */
+class HoodieFlinkRecordMerger implements HoodieRecordMerger {
+
+ @Override
+ Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema
schema, Properties props) throws IOException {
+ // HoodieFlinkRecord precombine and combineAndGetUpdateValue
+ }
- @Override
- public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older,
HoodieRecord newer, Schema schema, Properties props) {
- // HoodieFlinkRecord combineAndGetUpdateValue
- }
+ @Override
+ HoodieRecordType getRecordType() {
+ return HoodieRecordType.FLINK;
}
+}
```
Where user can provide their own subclass implementing such interface for the
engines of interest.
-#### Migration from `HoodieRecordPayload` to `HoodieMerge`
+#### Migration from `HoodieRecordPayload` to `HoodieRecordMerger`
To warrant backward-compatibility (BWC) on the code-level with already created
subclasses of `HoodieRecordPayload` currently
-already used in production by Hudi users, we will provide a BWC-bridge in the
form of instance of `HoodieMerge`, that will
+already used in production by Hudi users, we will provide a BWC-bridge in the
form of instance of `HoodieRecordMerger` called `HoodieAvroRecordMerger`, that
will
be using user-defined subclass of `HoodieRecordPayload` to combine the records.
Leveraging such bridge will make provide for seamless BWC migration to the
0.11 release, however will be removing the performance
benefit of this refactoring, since it would unavoidably have to perform
conversion to intermediate representation (Avro). To realize
full-suite of benefits of this refactoring, users will have to migrate their
merging logic out of `HoodieRecordPayload` subclass and into
-new `HoodieMerge` implementation.
+new `HoodieRecordMerger` implementation.
+
+Precombine is used to merge records from logs or incoming records;
CombineAndGetUpdateValue is used to merge record from log file and record from
base file.
+these two merge logics are not exactly the same for some RecordPayload, such
as OverwriteWithLatestAvroPaload.
+We add an Enum in HoodieRecord to mark where it comes from(BASE, LOG or
WRITE). `HoodieAvroRecordMerger`'s API will look like following:
Review Comment:
I think we should qualify existing behavior as simply a bug and resolve it
with this, instead of going out of the way to maintain BWC.
What's your take folks @vinothchandar @prasannarajaperumal?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]