nsivabalan commented on code in PR #13964:
URL: https://github.com/apache/hudi/pull/13964#discussion_r2371466164
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -192,8 +192,8 @@ public Option<HoodieRecordMerger>
getRecordMerger(RecordMergeMode mergeMode, Str
}
@Override
- public SizeEstimator<BufferedRecord<IndexedRecord>> getRecordSizeEstimator()
{
- return new AvroRecordSizeEstimator(getSchemaHandler().getRequiredSchema());
+ public SizeEstimator<BufferedRecord<IndexedRecord>>
getRecordSizeEstimator(Option<Schema> recordSchemaOpt) {
+ return new
AvroRecordSizeEstimator(recordSchemaOpt.orElse(getSchemaHandler().getRequiredSchema()));
Review Comment:
this size estimator is only used for external spillable map.
we have 2 use-cases for it.
1. COW merges.
2. MOR compaction.
for 1: incoming records will not contain meta fields
for 2: log records will contain meta fields.
for (2), we can afford to use schemaHandler.getRequriedSchema(), where as
for (1), we can't do that. And so we are overriding it here.
can you help me understand why cur fix may not work ?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/SerializableIndexedRecord.java:
##########
@@ -46,6 +46,7 @@ public class SerializableIndexedRecord implements
GenericRecord, KryoSerializabl
private static final long serialVersionUID = 1L;
private IndexedRecord record;
+ private Schema schema;
Review Comment:
My plan was to use the default serializer only and do away w/ custom
serializer.
bcoz, if we look at BufferedRecord, its going to contain
`SerializedIndexedRecord` which is only going to contain `byte[]` w/o the
actual generic record.
But the new `inflate` api that I introduced does not look elegant and might
complicate things across all diff write paths.
so, we can keep the `schema` here.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/SerializableIndexedRecord.java:
##########
@@ -59,17 +60,17 @@ private SerializableIndexedRecord(IndexedRecord record) {
@Override
public void put(int i, Object v) {
- record.put(i, v);
+ getData().put(i, v);
}
@Override
public Object get(int i) {
- return record.get(i);
+ return getData().get(i);
Review Comment:
yes, that also need to be fixed
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java:
##########
@@ -126,6 +126,8 @@ private HoodieAvroIndexedRecord(
this.newLocation = record.getNewLocation();
this.ignoreIndexUpdate = record.getIgnoreIndexUpdate();
this.binaryRecord = (SerializableIndexedRecord) this.data;
+ this.isDelete = record.isDelete;
Review Comment:
Note To Reviewer:
this fixes correct rendering of `ordering values` to write handles.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala:
##########
@@ -130,9 +131,12 @@ object HoodieCreateRecordUtils {
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean
val requiresPayload = isChangingRecords(operation) &&
!config.isFileGroupReaderBasedMergeHandle
+ val mergeProps = ConfigUtils.getMergeProps(config.getProps,
args.tableConfig.getProps)
+ val deleteContext = new DeleteContext(mergeProps,
writerSchema).withReaderSchema(writerSchema);
// handle dropping partition columns
it.map { avroRec =>
+ val isDelete: Boolean =
AvroRecordContext.getFieldAccessorInstance.isDeleteRecord(avroRec,
deleteContext)
Review Comment:
essentially, we need all components of `BufferedRecord` like ordering value,
isDeleted to be part of `HoodieRecord` itself.
So, that w/n write handles, we an afford to lazily deserialize the
`SerializedIndexedRecord`.
Infact, for commit time ordering and event time ordering we may not even
need to deser and get away with it
and hence we are looking to fix all record creation code snippets to account
for custom deletes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]