vinothchandar commented on code in PR #5522:
URL: https://github.com/apache/hudi/pull/5522#discussion_r891853078
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java:
##########
@@ -87,29 +95,67 @@ protected GenericRecord
transformRecordBasedOnNewSchema(GenericDatumReader<Gener
* Create Parquet record iterator that provides a stitched view of record
read from skeleton and bootstrap file.
* Skeleton file is a representation of the bootstrap file inside the table,
with just the bare bone fields needed
* for indexing, writing and other functionality.
- *
*/
- protected Iterator<GenericRecord> getMergingIterator(HoodieTable<T, I, K, O>
table, HoodieMergeHandle<T, I, K, O> mergeHandle,
-
HoodieBaseFile baseFile, HoodieFileReader<GenericRecord> reader,
-
Schema readSchema, boolean externalSchemaTransformation) throws
IOException {
+ protected Iterator<HoodieRecord> getMergingIterator(HoodieTable<T, I, K, O>
table,
+ HoodieMergeHandle<T, I,
K, O> mergeHandle,
+ HoodieBaseFile baseFile,
+ HoodieFileReader reader,
+ Schema readerSchema,
+ boolean
externalSchemaTransformation) throws IOException {
Path externalFilePath = new
Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new
Configuration(table.getHadoopConf());
- HoodieFileReader<GenericRecord> bootstrapReader =
HoodieFileReaderFactory.<GenericRecord>getFileReader(bootstrapFileConfig,
externalFilePath);
+ HoodieFileReader bootstrapReader =
HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, externalFilePath);
+
Schema bootstrapReadSchema;
if (externalSchemaTransformation) {
bootstrapReadSchema = bootstrapReader.getSchema();
} else {
bootstrapReadSchema = mergeHandle.getWriterSchema();
}
- return new MergingIterator<>(reader.getRecordIterator(readSchema),
bootstrapReader.getRecordIterator(bootstrapReadSchema),
- (inputRecordPair) ->
HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(),
inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
+ return new MergingIterator<>(
+ reader.getRecordIterator(readerSchema, HoodieAvroIndexedRecord::new),
+ bootstrapReader.getRecordIterator(bootstrapReadSchema,
HoodieAvroIndexedRecord::new),
+ (oneRecord, otherRecord) -> mergeRecords(oneRecord, otherRecord,
readerSchema, mergeHandle.getWriterSchemaWithMetaFields()));
+ }
+
+ @Nonnull
+ private static HoodieRecord mergeRecords(HoodieRecord one, HoodieRecord
other, Schema readerSchema, Schema writerSchema) {
+ try {
+ return one.mergeWith(other, readerSchema, writerSchema);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to merge records", e);
+ }
+ }
+
+ protected static HoodieRecord.Mapper createHoodieRecordMapper(HoodieTable<?,
?, ?, ?> table) {
Review Comment:
unused in this PR?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java:
##########
@@ -40,11 +62,192 @@ public HoodieRecord<T> newInstance() {
return new HoodieAvroRecord<>(this);
}
+ @Override
+ public HoodieRecord<T> newInstance(HoodieKey key, HoodieOperation op) {
+ return new HoodieAvroRecord<>(key, data, op);
+ }
+
+ @Override
+ public HoodieRecord<T> newInstance(HoodieKey key) {
+ return new HoodieAvroRecord<>(key, data);
+ }
+
@Override
public T getData() {
if (data == null) {
throw new IllegalStateException("Payload already deflated for record.");
}
return data;
}
+
+ @Override
+ public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+ return getRecordKey();
+ }
+
+ @Override
+ public Comparable<?> getOrderingValue() {
+ return data.getOrderingValue();
+ }
+
+ @Override
+ public Option<IndexedRecord> toIndexedRecord(Schema schema, Properties prop)
throws IOException {
+ return getData().getInsertValue(schema, prop);
+ }
+
+
//////////////////////////////////////////////////////////////////////////////
+
+ //
+ // NOTE: This method duplicates those ones of the HoodieRecordPayload and
are placed here
+ // for the duration of RFC-46 implementation, until migration off
`HoodieRecordPayload`
+ // is complete
+ //
+ // TODO cleanup
+
+ // NOTE: This method is assuming semantic that `preCombine` operation is
bound to pick one or the other
+ // object, and may not create a new one
+ @Override
+ public HoodieRecord<T> preCombine(HoodieRecord<T> previousRecord) {
+ T picked = unsafeCast(getData().preCombine(previousRecord.getData()));
+ if (picked instanceof HoodieMetadataPayload) {
Review Comment:
such an easter egg. cc @codope @yihua to file a JIRA to remove this bandaid
if inherited.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -18,14 +18,21 @@
package org.apache.hudi.common.model;
+import org.apache.avro.Schema;
Review Comment:
this class needs to be avro free ultimately?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/MappingIterator.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.util.function.Function;
+
+public class MappingIterator<T, R> implements ClosableIterator<R> {
Review Comment:
unit test for the class, if not present already?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * This only use by reader returning.
+ */
+public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
Review Comment:
If it does not exist yet, lets add some UTs for this class?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java:
##########
@@ -179,40 +181,45 @@ public final ClosableIterator<IndexedRecord>
getRecordIterator(List<String> keys
return FilteringIterator.getInstance(allRecords, keySet, fullKey,
this::getRecordKey);
}
- protected ClosableIterator<IndexedRecord> readRecordsFromBlockPayload()
throws IOException {
+ protected ClosableIterator<HoodieRecord>
readRecordsFromBlockPayload(HoodieRecord.Mapper mapper) throws IOException {
if (readBlockLazily && !getContent().isPresent()) {
// read log block contents from disk
inflate();
}
try {
- return deserializeRecords(getContent().get());
+ return deserializeRecords(getContent().get(), mapper);
} finally {
// Free up content to be GC'd by deflating the block
deflate();
}
}
- protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys,
boolean fullKey) throws IOException {
+ protected ClosableIterator<HoodieRecord> lookupRecords(List<String> keys,
boolean fullKey, HoodieRecord.Mapper mapper) throws IOException {
throw new UnsupportedOperationException(
String.format("Point lookups are not supported by this Data block type
(%s)", getBlockType())
);
}
- protected abstract byte[] serializeRecords(List<IndexedRecord> records)
throws IOException;
+ protected abstract byte[] serializeRecords(List<HoodieRecord> records)
throws IOException;
- protected abstract ClosableIterator<IndexedRecord> deserializeRecords(byte[]
content) throws IOException;
+ protected abstract ClosableIterator<HoodieRecord> deserializeRecords(byte[]
content, HoodieRecord.Mapper mapper) throws IOException;
public abstract HoodieLogBlockType getBlockType();
protected Option<Schema.Field> getKeyField(Schema schema) {
return Option.ofNullable(schema.getField(keyFieldName));
}
- protected Option<String> getRecordKey(IndexedRecord record) {
- return getKeyField(record.getSchema())
- .map(keyField -> record.get(keyField.pos()))
- .map(Object::toString);
+ protected Option<String> getRecordKey(HoodieRecord record) {
+ if (record instanceof HoodieAvroIndexedRecord) {
Review Comment:
is there a plan to ultimately get rid of the instanceof? I feel -this should
be abstracted away
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java:
##########
@@ -120,19 +122,21 @@ protected byte[] serializeRecords(List<IndexedRecord>
records) throws IOExceptio
// Serialize records into bytes
Map<String, byte[]> sortedRecordsMap = new TreeMap<>();
- Iterator<IndexedRecord> itr = records.iterator();
+ // Get writer schema
Review Comment:
Is there a possibility write schema is not present? We need to think thru
schema evol cases here more carefully and ensure all is good. Thinking out loud.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -142,7 +144,8 @@ public void runMerge(HoodieTable<T,
HoodieData<HoodieRecord<T>>, HoodieData<Hood
if (!externalSchemaTransformation) {
return record;
}
- return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache,
decoderCache, (GenericRecord) record);
+ // TODO Other type of record need to change
+ return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache,
decoderCache, (GenericRecord) ((HoodieRecord)record).getData());
Review Comment:
we need to ultimately make this avro free as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]