nsivabalan commented on code in PR #13987:
URL: https://github.com/apache/hudi/pull/13987#discussion_r2389281476
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala:
##########
@@ -116,8 +118,18 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
options: util.Map[String, String],
schemaStr: String): Unit = {
val schema = new Schema.Parser().parse(schemaStr)
- val genericRecords =
spark.sparkContext.parallelize(recordList.asScala.map(_.toIndexedRecord(schema,
CollectionUtils.emptyProps))
- .filter(r => r.isPresent).map(r =>
r.get.getData.asInstanceOf[GenericRecord]).toSeq, 2)
+ val genericRecords : RDD[GenericRecord] =
spark.sparkContext.parallelize(recordList.asScala.map(_.toIndexedRecord(schema,
CollectionUtils.emptyProps))
+ .filter(r => r.isPresent).map(r => {
+ val data = r.get.getData
+ if (data.isInstanceOf[SerializableIndexedRecord])
+ {
+ // accessing a field to trigger deser of indexed record
+ data.get(0)
Review Comment:
I have added a private method. but as of now, not using it in many places.
if I come across more, will re-use.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java:
##########
@@ -51,8 +49,8 @@ private static Stream<Arguments> testArgs() {
return b.build();
}
- @ParameterizedTest
- @MethodSource("testArgs")
+ //@ParameterizedTest
+ //@MethodSource("testArgs") Disabling to target green CI w/o this flaky
test. Will revert before opening it up for review.
Review Comment:
yes
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java:
##########
@@ -297,6 +297,8 @@ public void testTagLocation() throws Exception {
for (HoodieRecord record : taggedRecordRDD.collect()) {
IndexedRecord data = record.toIndexedRecord(SIMPLE_RECORD_SCHEMA,
CollectionUtils.emptyProps()).get().getData();
+ // eager deser.
+ data.get(0);
Review Comment:
if not, equality check below fails
```
java.lang.IllegalArgumentException: Records must be deserialized before
equality check
at
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42)
at
org.apache.hudi.common.model.SerializableIndexedRecord.equals(SerializableIndexedRecord.java:166)
at
org.junit.jupiter.api.AssertionUtils.objectsAreEqual(AssertionUtils.java:110)
at
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:181)
at
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1145)
at
org.apache.hudi.index.bloom.TestHoodieGlobalBloomIndex.testTagLocation(TestHoodieGlobalBloomIndex.java:305)
at java.lang.reflect.Method.invoke(Method.java:498)
```
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestUpdateProcessor.java:
##########
@@ -174,6 +175,13 @@ void testHandleInsertWithPayload(boolean shouldIgnore) {
}
}
+ private static BufferedRecord<IndexedRecord>
getRecordWithSerializableIndexedRecord(String value, HoodieOperation operation)
{
+ GenericRecord record = new GenericData.Record(SCHEMA);
+ record.put("key", KEY);
+ record.put("value", value);
+ return new BufferedRecord<>(KEY, 1, new
HoodieAvroIndexedRecord(record).getData(), 0, operation);
Review Comment:
sure
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java:
##########
@@ -453,11 +454,15 @@ public void testBasicWriteAndScan(int tableVersion)
throws IOException, URISynta
List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
assertEquals(copyOfRecords.size(), recordsRead.size(),
"Read records size should be equal to the written records size");
- assertEquals(copyOfRecords, recordsRead,
+ assertEquals(getSerializableIndexedRecordList(copyOfRecords), recordsRead,
"Both records lists should be the same. (ordering guaranteed)");
reader.close();
}
+ private List<IndexedRecord>
getSerializableIndexedRecordList(List<IndexedRecord> indexedRecords) {
Review Comment:
sure.
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java:
##########
@@ -743,12 +748,17 @@ private void
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapTyp
List<IndexedRecord> scannedRecords = new ArrayList<>();
for (HoodieRecord record : scanner) {
- scannedRecords.add(record.toIndexedRecord(schema,
CollectionUtils.emptyProps()).get().getData());
+ Object data = record.toIndexedRecord(schema,
CollectionUtils.emptyProps()).get().getData();
+ if (data instanceof SerializableIndexedRecord) {
+ scannedRecords.add((GenericRecord) ((SerializableIndexedRecord)
data).getData());
Review Comment:
sure
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -318,6 +318,9 @@ public long getCurrentPosition() {
*/
public void setIgnoreIndexUpdate(boolean ignoreFlag) {
this.ignoreIndexUpdate = ignoreFlag;
+ if (ignoreFlag) {
Review Comment:
this is an optimization.
previously, we had to explicitly check for operation type here
https://github.com/apache/hudi/blob/c9069fe5794840971c1710034b08a0f36f53518e/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java#L91
but now, we have removed that. and its working w/o any additional fix, since
we fixed the `HoodieRecord` inherently to track this
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java:
##########
@@ -425,7 +426,7 @@ protected HoodieWriteConfig.Builder
getConfigBuilder(Boolean autoCommit, Boolean
protected Dataset<Row> toDataset(List<HoodieRecord> records, Schema schema) {
List<GenericRecord> avroRecords = records.stream()
- .map(r -> (GenericRecord) r.getData())
+ .map(r -> (GenericRecord)
((SerializableIndexedRecord)r.getData()).getData())
Review Comment:
we end up calling `AvroConversionUtils.createDataFrame` (
https://github.com/apache/hudi/blob/c9069fe5794840971c1710034b08a0f36f53518e/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala#L116
)
which in turn ends up calling `AvroDeserializer` to deserialize avro
records.
and we end up w/ below exception if not for above fix.
```
Caused by: java.lang.NullPointerException
at org.apache.avro.Schema.applyAliases(Schema.java:1890)
at
org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:131)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:238)
at
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:228)
at
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:220)
at
org.apache.hudi.common.model.SerializableIndexedRecord.getData(SerializableIndexedRecord.java:106)
at
org.apache.hudi.common.model.SerializableIndexedRecord.get(SerializableIndexedRecord.java:81)
at
org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$3(AvroDeserializer.scala:391)
at
org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$3$adapted(AvroDeserializer.scala:387)
at
org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:88)
at
org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:106)
at
org.apache.spark.sql.avro.HoodieSpark3_5AvroDeserializer.deserialize(HoodieSpark3_5AvroDeserializer.scala:30)
at
org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:54)
at
org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:92)
at
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:123)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
```
In other words,
`data` goes through ser and deser while creating dataframe, and hence the
genericRecord in `SerializableIndexedRecord` is converted to binary. So, unless
we call `decodeRecord(Schema)` which can only be invoked from within
`HoodieAvroIndexedRecord`, I could not find a way out apart from converting to
GenericRecords within `toDataset` here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]