nsivabalan commented on code in PR #13987:
URL: https://github.com/apache/hudi/pull/13987#discussion_r2389281476


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala:
##########
@@ -116,8 +118,18 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
                              options: util.Map[String, String],
                              schemaStr: String): Unit = {
     val schema = new Schema.Parser().parse(schemaStr)
-    val genericRecords = 
spark.sparkContext.parallelize(recordList.asScala.map(_.toIndexedRecord(schema, 
CollectionUtils.emptyProps))
-      .filter(r => r.isPresent).map(r => 
r.get.getData.asInstanceOf[GenericRecord]).toSeq, 2)
+    val genericRecords : RDD[GenericRecord] = 
spark.sparkContext.parallelize(recordList.asScala.map(_.toIndexedRecord(schema, 
CollectionUtils.emptyProps))
+      .filter(r => r.isPresent).map(r =>  {
+      val data = r.get.getData
+      if (data.isInstanceOf[SerializableIndexedRecord])
+      {
+        // accessing a field to trigger deser of indexed record
+        data.get(0)

Review Comment:
   I have added a private method. but as of now, not using it in many places. 
if I come across more, will re-use. 
   



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java:
##########
@@ -51,8 +49,8 @@ private static Stream<Arguments> testArgs() {
     return b.build();
   }
 
-  @ParameterizedTest
-  @MethodSource("testArgs")
+  //@ParameterizedTest
+  //@MethodSource("testArgs") Disabling to target green CI w/o this flaky 
test. Will revert before opening it up for review.

Review Comment:
   yes



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java:
##########
@@ -297,6 +297,8 @@ public void testTagLocation() throws Exception {
 
     for (HoodieRecord record : taggedRecordRDD.collect()) {
       IndexedRecord data = record.toIndexedRecord(SIMPLE_RECORD_SCHEMA, 
CollectionUtils.emptyProps()).get().getData();
+      // eager deser.
+      data.get(0);

Review Comment:
   if not, equality check below fails 
   ```
   java.lang.IllegalArgumentException: Records must be deserialized before 
equality check
   
        at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42)
        at 
org.apache.hudi.common.model.SerializableIndexedRecord.equals(SerializableIndexedRecord.java:166)
        at 
org.junit.jupiter.api.AssertionUtils.objectsAreEqual(AssertionUtils.java:110)
        at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:181)
        at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
        at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1145)
        at 
org.apache.hudi.index.bloom.TestHoodieGlobalBloomIndex.testTagLocation(TestHoodieGlobalBloomIndex.java:305)
        at java.lang.reflect.Method.invoke(Method.java:498)
   
   ``` 



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestUpdateProcessor.java:
##########
@@ -174,6 +175,13 @@ void testHandleInsertWithPayload(boolean shouldIgnore) {
     }
   }
 
+  private static BufferedRecord<IndexedRecord> 
getRecordWithSerializableIndexedRecord(String value, HoodieOperation operation) 
{
+    GenericRecord record = new GenericData.Record(SCHEMA);
+    record.put("key", KEY);
+    record.put("value", value);
+    return new BufferedRecord<>(KEY, 1, new 
HoodieAvroIndexedRecord(record).getData(), 0, operation);

Review Comment:
   sure



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java:
##########
@@ -453,11 +454,15 @@ public void testBasicWriteAndScan(int tableVersion) 
throws IOException, URISynta
     List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
     assertEquals(copyOfRecords.size(), recordsRead.size(),
         "Read records size should be equal to the written records size");
-    assertEquals(copyOfRecords, recordsRead,
+    assertEquals(getSerializableIndexedRecordList(copyOfRecords), recordsRead,
         "Both records lists should be the same. (ordering guaranteed)");
     reader.close();
   }
 
+  private List<IndexedRecord> 
getSerializableIndexedRecordList(List<IndexedRecord> indexedRecords) {

Review Comment:
   sure.



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java:
##########
@@ -743,12 +748,17 @@ private void 
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapTyp
 
     List<IndexedRecord> scannedRecords = new ArrayList<>();
     for (HoodieRecord record : scanner) {
-      scannedRecords.add(record.toIndexedRecord(schema, 
CollectionUtils.emptyProps()).get().getData());
+      Object data = record.toIndexedRecord(schema, 
CollectionUtils.emptyProps()).get().getData();
+      if (data instanceof SerializableIndexedRecord) {
+        scannedRecords.add((GenericRecord) ((SerializableIndexedRecord) 
data).getData());

Review Comment:
   sure



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -318,6 +318,9 @@ public long getCurrentPosition() {
    */
   public void setIgnoreIndexUpdate(boolean ignoreFlag) {
     this.ignoreIndexUpdate = ignoreFlag;
+    if (ignoreFlag) {

Review Comment:
   this is an optimization. 
   previously, we had to explicitly check for operation type here 
https://github.com/apache/hudi/blob/c9069fe5794840971c1710034b08a0f36f53518e/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java#L91
 
   but now, we have removed that. and its working w/o any additional fix, since 
we fixed the `HoodieRecord` inherently to track this



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java:
##########
@@ -425,7 +426,7 @@ protected HoodieWriteConfig.Builder 
getConfigBuilder(Boolean autoCommit, Boolean
 
   protected Dataset<Row> toDataset(List<HoodieRecord> records, Schema schema) {
     List<GenericRecord> avroRecords = records.stream()
-        .map(r -> (GenericRecord) r.getData())
+        .map(r -> (GenericRecord) 
((SerializableIndexedRecord)r.getData()).getData())

Review Comment:
   we end up calling  `AvroConversionUtils.createDataFrame` ( 
https://github.com/apache/hudi/blob/c9069fe5794840971c1710034b08a0f36f53518e/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala#L116
 )
   which in turn ends up calling `AvroDeserializer` to deserialize avro 
records. 
   
   and we end up w/ below exception if not for above fix. 
   ```
   Caused by: java.lang.NullPointerException
        at org.apache.avro.Schema.applyAliases(Schema.java:1890)
        at 
org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:131)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
        at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:238)
        at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:228)
        at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:220)
        at 
org.apache.hudi.common.model.SerializableIndexedRecord.getData(SerializableIndexedRecord.java:106)
        at 
org.apache.hudi.common.model.SerializableIndexedRecord.get(SerializableIndexedRecord.java:81)
        at 
org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$3(AvroDeserializer.scala:391)
        at 
org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$3$adapted(AvroDeserializer.scala:387)
        at 
org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:88)
        at 
org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:106)
        at 
org.apache.spark.sql.avro.HoodieSpark3_5AvroDeserializer.deserialize(HoodieSpark3_5AvroDeserializer.scala:30)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:54)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:92)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:123)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
   ```
   
   
   In other words,
   `data` goes through ser and deser while creating dataframe, and hence the 
genericRecord in `SerializableIndexedRecord` is converted to binary. So, unless 
we call `decodeRecord(Schema)` which can only be invoked from within 
`HoodieAvroIndexedRecord`, I could not find a way out apart from converting to 
GenericRecords   within `toDataset` here. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to