yihua commented on code in PR #10957:
URL: https://github.com/apache/hudi/pull/10957#discussion_r1628560599
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java:
##########
@@ -343,19 +310,19 @@ public Builder<T>
withRecordBuffer(HoodieFileGroupRecordBuffer<T> recordBuffer)
@Override
public HoodieMergedLogRecordReader<T> build() {
+ ValidationUtils.checkArgument(recordMerger != null);
+ ValidationUtils.checkArgument(recordBuffer != null);
+ ValidationUtils.checkArgument(readerContext != null);
Review Comment:
Add error message to the validation.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -285,8 +324,8 @@ protected Option<T> merge(Option<T> older, Map<String,
Object> olderInfoMap,
* 1. A set of pre-specified keys exists.
* 2. The key of the record is not contained in the set.
*/
- protected boolean shouldSkip(T record, String keyFieldName, boolean
isFullKey, Set<String> keys) {
- String recordKey = readerContext.getValue(record, readerSchema,
keyFieldName).toString();
+ protected boolean shouldSkip(T record, String keyFieldName, boolean
isFullKey, Set<String> keys, Schema dataBlockSchema) {
Review Comment:
Is `dataBlockSchema` the writer schema? Rename it as `writerSchema`?
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java:
##########
@@ -73,6 +77,75 @@ public static Schema convert(InternalSchema internalSchema,
String name) {
return buildAvroSchemaFromInternalSchema(internalSchema, name);
}
+ public static InternalSchema pruneAvroSchemaToInternalSchema(Schema schema,
InternalSchema originSchema) {
Review Comment:
To clarify, is this only used for internal schema? Does schema evolution
incur record conversion between Row and Avro records (which should be avoided
as much as possible)?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -275,6 +311,9 @@ protected Option<T> merge(Option<T> older, Map<String,
Object> olderInfoMap,
if (mergedRecord.isPresent()
&&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(),
payloadProps)) {
+ if (!mergedRecord.get().getRight().equals(readerSchema)) {
+ return Option.ofNullable((T)
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
null, readerSchema).getData());
Review Comment:
Do partial updates need schema evolution handling like this?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -242,7 +250,44 @@ protected Pair<ClosableIterator<T>, Schema>
getRecordsIterator(HoodieDataBlock d
} else {
blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext);
}
- return Pair.of(blockRecordsIterator, dataBlock.getSchema());
+ Option<Pair<Function<T,T>, Schema>> schemaEvolutionTransformerOpt =
+ composeEvolvedSchemaTransformer(dataBlock);
+
+ // In case when schema has been evolved original persisted records will
have to be
+ // transformed to adhere to the new schema
+ Function<T,T> transformer =
+ schemaEvolutionTransformerOpt.map(Pair::getLeft)
+ .orElse(Function.identity());
+
+ Schema schema = schemaEvolutionTransformerOpt.map(Pair::getRight)
+ .orElseGet(dataBlock::getSchema);
+
+ return Pair.of(new CloseableMappingIterator<>(blockRecordsIterator,
transformer), schema);
+ }
+
+ /**
+ * Get final Read Schema for support evolution.
+ * step1: find the fileSchema for current dataBlock.
+ * step2: determine whether fileSchema is compatible with the final read
internalSchema.
+ * step3: merge fileSchema and read internalSchema to produce final read
schema.
+ *
+ * @param dataBlock current processed block
+ * @return final read schema.
+ */
+ protected Option<Pair<Function<T,T>, Schema>>
composeEvolvedSchemaTransformer(
+ HoodieDataBlock dataBlock) {
+ if (internalSchema.isEmptySchema()) {
+ return Option.empty();
+ }
+
+ long currentInstantTime =
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
+ InternalSchema fileSchema =
InternalSchemaCache.searchSchemaAndCache(currentInstantTime,
+ hoodieTableMetaClient, false);
Review Comment:
@jonvex follow-up JIRA to track?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]