dataproblems opened a new issue, #12571:
URL: https://github.com/apache/hudi/issues/12571

   **Describe the problem you faced**
   
   I created a custom payload for merging records on my hudi table with record 
level index and then tried to upsert values in the table. The upsert operation 
was successful for a single record as well as batches of records less than 50k 
in size but as I tried with 50k records ( or more ) I ran into an 
UnsupportedOperationException ( stacktrace provided ). 
   
   **To Reproduce**
   
   ### Step 1: Generate the data and hudi table
   
   #### Case class that encapsulates my dataset
   
   ```
   final case class RandomData(
     id: String,
     field1: String,
     field2: String,
     field3: String,
     field4: Boolean,
     field5: Long,
     ts: Long,
     partition: String,
     fruits: String
   ) {
     def combine(that: RandomData): RandomData = {
       val updatedFruit: String = this.fruits + s",${that.fruits}"
       RandomData(
         id = this.id,
         field1 = this.field1,
         field2 = this.field2,
         field3 = this.field3,
         field4 = this.field4,
         field5 = this.field5,
         ts = Math.max(this.ts, that.ts),
         partition = that.partition,
         fruits = updatedFruit
       )
     }
   }
   
   object RandomData {
     def apply(id: Long, partition: String, fruits: String): RandomData = {
       RandomData(
         id = id.toString,
         field1 = UUID.randomUUID().toString,
         field2 = UUID.randomUUID().toString,
         field3 = UUID.randomUUID().toString,
         field4 = true,
         field5 = 1000L,
         ts = 2880000L,
         partition = partition,
         fruits = fruits
       )
     }
   }
   ```
   
   #### Spark code to generate sample dataframe
   
   ```
   val partitions = List("One", "Two", "Three", "Four")
   import spark.implicits._
   
   val randomData = spark
       .range(1, 10 * 10000000L)
       .map(f => RandomData(id = f, partition = 
Random.shuffle(partitions).head, fruits = "apple"))
   ```
   
   ### Step 2: Create hudi table from the random data
   
   ```
   
   import scala.util.Random
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.common.table.HoodieTableConfig
   import org.apache.hudi.config.HoodieIndexConfig
   import org.apache.hudi.common.config.HoodieStorageConfig
   import org.apache.hudi.common.config.HoodieMetadataConfig
   import org.apache.hudi.keygen.SimpleKeyGenerator
   
   val tableName = "randomDataWithFruits"
   
     val insertOptions: Map[String, String] = Map(
       DataSourceWriteOptions.OPERATION.key() -> 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
       DataSourceWriteOptions.TABLE_TYPE.key() -> 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
       HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
       HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
       HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
       HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
       DataSourceWriteOptions.META_SYNC_ENABLED.key() -> "false",
       "hoodie.metadata.record.index.enable" -> "true",
       "hoodie.metadata.enable" -> "true",
       "hoodie.datasource.write.hive_style_partitioning" -> "true",
       "hoodie.datasource.write.partitionpath.field" -> "partition",
       "hoodie.datasource.write.recordkey.field" -> "id",
       "hoodie.datasource.write.precombine.field" -> "ts",
       "hoodie.table.name" -> tableName,
       DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> 
classOf[SimpleKeyGenerator].getName,
       "hoodie.write.markers.type" -> "DIRECT",
       "hoodie.embed.timeline.server" -> "false"
     )
   
     val basePath = // Some S3 path. 
   
     
randomData.repartition(100).write.format("hudi").options(insertOptions).mode(Overwrite).save(basePath)
   ```
   
   ### Step 3: Create a custom hudi payload class 
   
   ```
   import org.apache.avro.Schema
   import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
   import org.apache.avro.reflect.ReflectData
   import org.apache.hudi.common.model.{BaseAvroPayload, HoodieRecordPayload}
   import org.apache.hudi.common.util.{Option => HudiOption}
   
   class RandomDataPayload(record: GenericRecord, orderingVal: Comparable[_])
       extends BaseAvroPayload(record, orderingVal)
       with HoodieRecordPayload[RandomDataPayload] {
   
     def this(record: HudiOption[GenericRecord]) = this(record = 
record.orElse(null), orderingVal = 0)
   
     def toAvro: RandomData => GenericRecord = RandomDataPayload.toAvro
   
     def fromAvro: GenericRecord => RandomData = RandomDataPayload.fromAvro
   
     override def preCombine(oldValue: RandomDataPayload): RandomDataPayload = {
       (oldValue.recordBytes.isEmpty, 
oldValue.orderingVal.asInstanceOf[Comparable[Any]].compareTo(orderingVal)) 
match {
         case (true, _)            => this
         case (false, c) if c > 0  => oldValue
         case (false, c) if c <= 0 => this
       }
     }
   
     override def preCombine(oldValue: RandomDataPayload, schema: Schema, 
properties: Properties): RandomDataPayload = preCombine(oldValue = oldValue)
   
     @throws[IOException]
     override def combineAndGetUpdateValue(valueInStorage: IndexedRecord, 
schema: Schema): HudiOption[IndexedRecord] = {
       val currentValueObject = 
fromAvro(valueInStorage.asInstanceOf[GenericRecord])
       val otherObject = fromAvro(this.record)
   
       HudiOption.of(toAvro(currentValueObject.combine(otherObject)))
     }
   
     @throws[IOException]
     override def combineAndGetUpdateValue(valueInStorage: IndexedRecord, 
schema: Schema, properties: Properties): HudiOption[IndexedRecord] =
       combineAndGetUpdateValue(valueInStorage = valueInStorage, schema = 
schema)
   
     @throws[IOException]
     override def getInsertValue(schema: Schema): HudiOption[IndexedRecord] = {
       if (recordBytes.isEmpty || isDeletedRecord) {
         HudiOption.empty[IndexedRecord]
       } else {
         HudiOption.of(record)
       }
     }
   
     @throws[IOException]
     override def getInsertValue(schema: Schema, properties: Properties): 
HudiOption[IndexedRecord] = getInsertValue(schema)
   }
   
   
   object RandomDataPayload {
     val RandomDataSchema: Schema = {
       ReflectData.get.getSchema(classOf[RandomData])
     }
   
     def toAvro(payload: RandomData): GenericRecord = {
       val record = new GenericData.Record(RandomDataSchema)
       record.put("id", payload.id)
       record.put("field1", payload.field1)
       record.put("field2", payload.field1)
       record.put("field3", payload.field1)
       record.put("field4", payload.field1)
       record.put("field5", payload.field1)
       record.put("ts", payload.ts)
       record.put("partition", payload.partition)
       record.put("fruits", payload.fruits)
       record
     }
   
     def fromAvro(record: GenericRecord): RandomData = {
       RandomData(
         id = record.get("id").toString,
         field1 = record.get("field1").toString,
         field2 = record.get("field2").toString,
         field3 = record.get("field3").toString,
         field4 = record.get("field4").toString.toBoolean,
         field5 = record.get("field5").toString.toLong,
         partition = record.get("partition").toString,
         ts = record.get("ts").toString.toLong,
         fruits = record.get("fruits").toString
       )
     }
   
   }
   
   ```
   
   ### Step 4: Create a data parcel for the upsert 
   
   ```
   val updateParcel = randomData.map(f => f.copy(ts = f.ts + 100, fruits = 
"banana")).limit(50000)
   ```
   
   ### Step 5: Perform the upsert operation 
   
   ```
   val randomDataUpsertOptions: Map[String, String] = Map(
       "hoodie.datasource.write.precombine.field" -> "ts",
       "hoodie.datasource.write.recordkey.field" -> "id",
       "hoodie.table.name" -> "randomDataWithFruits",
       "hoodie.datasource.write.partitionpath.field" -> "partition",
       "hoodie.datasource.write.payload.class" -> 
classOf[RandomDataPayload].getName,
       "hoodie.upsert.shuffle.parallelism" -> "2000"
     )
   
   
updateParcel.write.format("hudi").mode("append").options(randomDataUpsertOptions).save(basePath)
   ```
   
   
   **Expected behavior**
   
   I expect the upsert operation to go through without any exceptions. 
   
   **Environment Description**
   
   * Hudi version : 0.15.0
   
   * Spark version : 3.4.1
   
   * Hive version :
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   
   **Additional context**
   
   This problem does not surface when I try 20k or 30k rows for the upsert but 
does surface when I go higher. 
   
   
   **Stacktrace**
   
   ```
   Caused by: org.apache.hudi.exception.HoodieException: 
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
   Serialization trace:
   reserved (org.apache.avro.Schema$Field)
   fieldMap (org.apache.avro.Schema$RecordSchema)
   schema (org.apache.avro.generic.GenericData$Record)
   record (com.test.hudi.RandomDataPayload)
     at 
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:75)
     at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
     ... 33 more
   Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.UnsupportedOperationException
   Serialization trace:
   reserved (org.apache.avro.Schema$Field)
   fieldMap (org.apache.avro.Schema$RecordSchema)
   schema (org.apache.avro.generic.GenericData$Record)
   record (com.test.hudi.RandomDataPayload)
     at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
     at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
     at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
     at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
     at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
     at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
     at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
     at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
     at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
     at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
     at 
org.apache.hudi.common.model.HoodieAvroRecord.readRecordPayload(HoodieAvroRecord.java:231)
     at 
org.apache.hudi.common.model.HoodieAvroRecord.readRecordPayload(HoodieAvroRecord.java:48)
     at org.apache.hudi.common.model.HoodieRecord.read(HoodieRecord.java:373)
     at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:520)
     at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:512)
     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
     at 
org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:102)
     at 
org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:76)
     at 
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:209)
     at 
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:202)
     at 
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:198)
     at 
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:67)
     at 
org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:198)
     at 
org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:56)
     at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:350)
     at 
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:54)
     at 
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:44)
     at 
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:69)
     ... 34 more
   Caused by: java.lang.UnsupportedOperationException
     at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
     at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
     at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
     at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
     ... 66 more
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to