This is an automated email from the ASF dual-hosted git repository.
yuzhaojing pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-feature-rfc46 by this
push:
new 41392e119f [RFC-46][HUDI-4414] Update the RFC-46 doc to fix comments
feedback (#6132)
41392e119f is described below
commit 41392e119fcc7c7433d415d70b3800bc3dbf0e2b
Author: komao <[email protected]>
AuthorDate: Thu Sep 22 11:17:54 2022 +0800
[RFC-46][HUDI-4414] Update the RFC-46 doc to fix comments feedback (#6132)
* Update the RFC-46 doc to fix comments feedback
* fix
Co-authored-by: wangzixuan.wzxuan <[email protected]>
---
rfc/rfc-46/rfc-46.md | 169 ++++++++++++++++++++++++++++++++++++++++-----------
1 file changed, 134 insertions(+), 35 deletions(-)
diff --git a/rfc/rfc-46/rfc-46.md b/rfc/rfc-46/rfc-46.md
index a851a4443a..192bdbf8c6 100644
--- a/rfc/rfc-46/rfc-46.md
+++ b/rfc/rfc-46/rfc-46.md
@@ -38,7 +38,7 @@ when dealing with records (during merge, column value
extractions, writing into
While having a single format of the record representation is certainly making
implementation of some components simpler,
it bears unavoidable performance penalty of de-/serialization loop: every
record handled by Hudi has to be converted
-from (low-level) engine-specific representation (`Row` for Spark, `RowData`
for Flink, `ArrayWritable` for Hive) into intermediate
+from (low-level) engine-specific representation (`InternalRow` for Spark,
`RowData` for Flink, `ArrayWritable` for Hive) into intermediate
one (Avro), with some operations (like clustering, compaction) potentially
incurring this penalty multiple times (on read-
and write-paths).
@@ -84,59 +84,105 @@ is known to have poor performance (compared to
non-reflection based instantiatio
#### Record Merge API
-Stateless component interface providing for API Combining Records will look
like following:
+CombineAndGetUpdateValue and Precombine will converge to one API. Stateless
component interface providing for API Combining Records will look like
following:
```java
-interface HoodieMerge {
- HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
-
- Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older,
HoodieRecord newer, Schema schema, Properties props) throws IOException;
-}
+interface HoodieRecordMerger {
/**
- * Spark-specific implementation
+ * The kind of merging strategy this recordMerger belongs to. A UUID
represents merging strategy.
*/
- class HoodieSparkRecordMerge implements HoodieMerge {
+ String getMergingStrategy();
+
+ // This method converges combineAndGetUpdateValue and precombine from
HoodiePayload.
+ // It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we
can translate as having 3 versions A, B, C of the single record, both orders of
operations applications have to yield the same result)
+ Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema
schema, Properties props) throws IOException;
+
+ // The record type handled by the current merger
+ // SPARK, AVRO, FLINK
+ HoodieRecordType getRecordType();
+}
- @Override
- public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
- // HoodieSparkRecords preCombine
- }
+/**
+ * Spark-specific implementation
+ */
+class HoodieSparkRecordMerger implements HoodieRecordMerger {
+
+ @Override
+ public String getMergingStrategy() {
+ return UUID_MERGER_STRATEGY;
+ }
+
+ @Override
+ Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema
schema, Properties props) throws IOException {
+ // HoodieSparkRecord precombine and combineAndGetUpdateValue. It'd be
associative operation.
+ }
- @Override
- public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older,
HoodieRecord newer, Schema schema, Properties props) {
- // HoodieSparkRecord combineAndGetUpdateValue
- }
+ @Override
+ HoodieRecordType getRecordType() {
+ return HoodieRecordType.SPARK;
}
+}
- /**
- * Flink-specific implementation
- */
- class HoodieFlinkRecordMerge implements HoodieMerge {
-
- @Override
- public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
- // HoodieFlinkRecord preCombine
- }
+/**
+ * Flink-specific implementation
+ */
+class HoodieFlinkRecordMerger implements HoodieRecordMerger {
+
+ @Override
+ public String getMergingStrategy() {
+ return UUID_MERGER_STRATEGY;
+ }
+
+ @Override
+ Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema
schema, Properties props) throws IOException {
+ // HoodieFlinkRecord precombine and combineAndGetUpdateValue. It'd be
associative operation.
+ }
- @Override
- public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older,
HoodieRecord newer, Schema schema, Properties props) {
- // HoodieFlinkRecord combineAndGetUpdateValue
- }
+ @Override
+ HoodieRecordType getRecordType() {
+ return HoodieRecordType.FLINK;
}
+}
```
Where user can provide their own subclass implementing such interface for the
engines of interest.
-#### Migration from `HoodieRecordPayload` to `HoodieMerge`
+#### Migration from `HoodieRecordPayload` to `HoodieRecordMerger`
To warrant backward-compatibility (BWC) on the code-level with already created
subclasses of `HoodieRecordPayload` currently
-already used in production by Hudi users, we will provide a BWC-bridge in the
form of instance of `HoodieMerge`, that will
+already used in production by Hudi users, we will provide a BWC-bridge in the
form of instance of `HoodieRecordMerger` called `HoodieAvroRecordMerger`, that
will
be using user-defined subclass of `HoodieRecordPayload` to combine the records.
-Leveraging such bridge will make provide for seamless BWC migration to the
0.11 release, however will be removing the performance
+Leveraging such bridge will provide for seamless BWC migration to the 0.11
release, however will be removing the performance
benefit of this refactoring, since it would unavoidably have to perform
conversion to intermediate representation (Avro). To realize
full-suite of benefits of this refactoring, users will have to migrate their
merging logic out of `HoodieRecordPayload` subclass and into
-new `HoodieMerge` implementation.
+new `HoodieRecordMerger` implementation.
+
+Precombine is used to merge records from logs or incoming records;
CombineAndGetUpdateValue is used to merge record from log file and record from
base file.
+these two merge logics are unified in HoodieAvroRecordMerger as merge
function. `HoodieAvroRecordMerger`'s API will look like following:
+
+```java
+/**
+ * Backward compatibility HoodieRecordPayload implementation
+ */
+class HoodieAvroRecordMerger implements HoodieRecordMerger {
+
+ @Override
+ public String getMergingStrategy() {
+ return UUID_MERGER_STRATEGY;
+ }
+
+ @Override
+ Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema
schema, Properties props) throws IOException {
+ // HoodieAvroRecordMerger precombine and combineAndGetUpdateValue. It'd
be associative operation.
+ }
+
+ @Override
+ HoodieRecordType getRecordType() {
+ return HoodieRecordType.AVRO;
+ }
+}
+```
### Refactoring Flows Directly Interacting w/ Records:
@@ -156,13 +202,66 @@ Following major components will be refactored:
3. `HoodieRealtimeRecordReader`s
1. API will be returning opaque `HoodieRecord` instead of raw Avro payload
+### Config for RecordMerger
+The RecordMerger is engine-aware. We provide a config called MERGER_IMPLS. You
can set a list of RecordMerger class name to it. And you can set
MERGER_STRATEGY which is UUID of RecordMerger. Hudi will pick RecordMergers in
MERGER_IMPLS which has the same MERGER_STRATEGY according to the engine type at
runtime.
+
+### Public Api in HoodieRecord
+Because we implement different types of records, we need to implement
functionality similar to AvroUtils in HoodieRecord for different data(avro,
InternalRow, RowData).
+Its public API will look like following:
+
+```java
+import java.util.Properties;
+
+class HoodieRecord {
+
+ /**
+ * Get column in record to support RDDCustomColumnsSortPartitioner
+ */
+ Object getRecordColumnValues(Schema recordSchema, String[] columns,
+ boolean consistentLogicalTimestampEnabled);
+
+ /**
+ * Support bootstrap.
+ */
+ HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws
IOException;
+
+ /**
+ * Rewrite record into new schema(add meta columns)
+ */
+ HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema
targetSchema)
+ throws IOException;
+
+ /**
+ * Support schema evolution.
+ */
+ HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties
props, Schema newSchema,
+ Map<String, String> renameCols) throws IOException;
+
+ HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties
props, Schema newSchema) throws IOException;
+
+ HoodieRecord updateMetadataValues(Schema recordSchema, Properties props,
+ MetadataValues metadataValues) throws IOException;
+
+ boolean isDelete(Schema recordSchema, Properties props) throws IOException;
+
+ /**
+ * Is EmptyRecord. Generated by ExpressionPayload.
+ */
+ boolean shouldIgnore(Schema recordSchema, Properties props) throws
IOException;
+
+ Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties
props)
+ throws IOException;
+
+ // Other functions with getter or setter ...
+}
+```
## Rollout/Adoption Plan
- What impact (if any) will there be on existing users?
- Users of the Hudi will observe considerably better performance for most
of the routine operations: writing, reading, compaction, clustering, etc due to
avoiding the superfluous intermediate de-/serialization penalty
- By default, modified hierarchy would still leverage
- - Users will need to rebase their logic of combining records by creating a
subclass of `HoodieRecordPayload`, and instead subclass newly created interface
`HoodieMerge` to get full-suite of performance benefits
+ - Users will need to rebase their logic of combining records by creating a
subclass of `HoodieRecordPayload`, and instead subclass newly created interface
`HoodieRecordMerger` to get full-suite of performance benefits
- If we are changing behavior how will we phase out the older behavior?
- Older behavior leveraging `HoodieRecordPayload` for merging will be
marked as deprecated in 0.11, and subsequently removed in 0.1x
- If we need special migration tools, describe them here.