dataproblems opened a new issue, #12571:
URL: https://github.com/apache/hudi/issues/12571
**Describe the problem you faced**
I created a custom payload for merging records on my hudi table with record
level index and then tried to upsert values in the table. The upsert operation
was successful for a single record as well as batches of records less than 50k
in size but as I tried with 50k records ( or more ) I ran into an
UnsupportedOperationException ( stacktrace provided ).
**To Reproduce**
### Step 1: Generate the data and hudi table
#### Case class that encapsulates my dataset
```
final case class RandomData(
id: String,
field1: String,
field2: String,
field3: String,
field4: Boolean,
field5: Long,
ts: Long,
partition: String,
fruits: String
) {
def combine(that: RandomData): RandomData = {
val updatedFruit: String = this.fruits + s",${that.fruits}"
RandomData(
id = this.id,
field1 = this.field1,
field2 = this.field2,
field3 = this.field3,
field4 = this.field4,
field5 = this.field5,
ts = Math.max(this.ts, that.ts),
partition = that.partition,
fruits = updatedFruit
)
}
}
object RandomData {
def apply(id: Long, partition: String, fruits: String): RandomData = {
RandomData(
id = id.toString,
field1 = UUID.randomUUID().toString,
field2 = UUID.randomUUID().toString,
field3 = UUID.randomUUID().toString,
field4 = true,
field5 = 1000L,
ts = 2880000L,
partition = partition,
fruits = fruits
)
}
}
```
#### Spark code to generate sample dataframe
```
val partitions = List("One", "Two", "Three", "Four")
import spark.implicits._
val randomData = spark
.range(1, 10 * 10000000L)
.map(f => RandomData(id = f, partition =
Random.shuffle(partitions).head, fruits = "apple"))
```
### Step 2: Create hudi table from the random data
```
import scala.util.Random
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.keygen.SimpleKeyGenerator
val tableName = "randomDataWithFruits"
val insertOptions: Map[String, String] = Map(
DataSourceWriteOptions.OPERATION.key() ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.TABLE_TYPE.key() ->
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
DataSourceWriteOptions.META_SYNC_ENABLED.key() -> "false",
"hoodie.metadata.record.index.enable" -> "true",
"hoodie.metadata.enable" -> "true",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.write.partitionpath.field" -> "partition",
"hoodie.datasource.write.recordkey.field" -> "id",
"hoodie.datasource.write.precombine.field" -> "ts",
"hoodie.table.name" -> tableName,
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() ->
classOf[SimpleKeyGenerator].getName,
"hoodie.write.markers.type" -> "DIRECT",
"hoodie.embed.timeline.server" -> "false"
)
val basePath = // Some S3 path.
randomData.repartition(100).write.format("hudi").options(insertOptions).mode(Overwrite).save(basePath)
```
### Step 3: Create a custom hudi payload class
```
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
import org.apache.avro.reflect.ReflectData
import org.apache.hudi.common.model.{BaseAvroPayload, HoodieRecordPayload}
import org.apache.hudi.common.util.{Option => HudiOption}
class RandomDataPayload(record: GenericRecord, orderingVal: Comparable[_])
extends BaseAvroPayload(record, orderingVal)
with HoodieRecordPayload[RandomDataPayload] {
def this(record: HudiOption[GenericRecord]) = this(record =
record.orElse(null), orderingVal = 0)
def toAvro: RandomData => GenericRecord = RandomDataPayload.toAvro
def fromAvro: GenericRecord => RandomData = RandomDataPayload.fromAvro
override def preCombine(oldValue: RandomDataPayload): RandomDataPayload = {
(oldValue.recordBytes.isEmpty,
oldValue.orderingVal.asInstanceOf[Comparable[Any]].compareTo(orderingVal))
match {
case (true, _) => this
case (false, c) if c > 0 => oldValue
case (false, c) if c <= 0 => this
}
}
override def preCombine(oldValue: RandomDataPayload, schema: Schema,
properties: Properties): RandomDataPayload = preCombine(oldValue = oldValue)
@throws[IOException]
override def combineAndGetUpdateValue(valueInStorage: IndexedRecord,
schema: Schema): HudiOption[IndexedRecord] = {
val currentValueObject =
fromAvro(valueInStorage.asInstanceOf[GenericRecord])
val otherObject = fromAvro(this.record)
HudiOption.of(toAvro(currentValueObject.combine(otherObject)))
}
@throws[IOException]
override def combineAndGetUpdateValue(valueInStorage: IndexedRecord,
schema: Schema, properties: Properties): HudiOption[IndexedRecord] =
combineAndGetUpdateValue(valueInStorage = valueInStorage, schema =
schema)
@throws[IOException]
override def getInsertValue(schema: Schema): HudiOption[IndexedRecord] = {
if (recordBytes.isEmpty || isDeletedRecord) {
HudiOption.empty[IndexedRecord]
} else {
HudiOption.of(record)
}
}
@throws[IOException]
override def getInsertValue(schema: Schema, properties: Properties):
HudiOption[IndexedRecord] = getInsertValue(schema)
}
object RandomDataPayload {
val RandomDataSchema: Schema = {
ReflectData.get.getSchema(classOf[RandomData])
}
def toAvro(payload: RandomData): GenericRecord = {
val record = new GenericData.Record(RandomDataSchema)
record.put("id", payload.id)
record.put("field1", payload.field1)
record.put("field2", payload.field1)
record.put("field3", payload.field1)
record.put("field4", payload.field1)
record.put("field5", payload.field1)
record.put("ts", payload.ts)
record.put("partition", payload.partition)
record.put("fruits", payload.fruits)
record
}
def fromAvro(record: GenericRecord): RandomData = {
RandomData(
id = record.get("id").toString,
field1 = record.get("field1").toString,
field2 = record.get("field2").toString,
field3 = record.get("field3").toString,
field4 = record.get("field4").toString.toBoolean,
field5 = record.get("field5").toString.toLong,
partition = record.get("partition").toString,
ts = record.get("ts").toString.toLong,
fruits = record.get("fruits").toString
)
}
}
```
### Step 4: Create a data parcel for the upsert
```
val updateParcel = randomData.map(f => f.copy(ts = f.ts + 100, fruits =
"banana")).limit(50000)
```
### Step 5: Perform the upsert operation
```
val randomDataUpsertOptions: Map[String, String] = Map(
"hoodie.datasource.write.precombine.field" -> "ts",
"hoodie.datasource.write.recordkey.field" -> "id",
"hoodie.table.name" -> "randomDataWithFruits",
"hoodie.datasource.write.partitionpath.field" -> "partition",
"hoodie.datasource.write.payload.class" ->
classOf[RandomDataPayload].getName,
"hoodie.upsert.shuffle.parallelism" -> "2000"
)
updateParcel.write.format("hudi").mode("append").options(randomDataUpsertOptions).save(basePath)
```
**Expected behavior**
I expect the upsert operation to go through without any exceptions.
**Environment Description**
* Hudi version : 0.15.0
* Spark version : 3.4.1
* Hive version :
* Hadoop version :
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : No
**Additional context**
This problem does not surface when I try 20k or 30k rows for the upsert but
does surface when I go higher.
**Stacktrace**
```
Caused by: org.apache.hudi.exception.HoodieException:
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
record (com.test.hudi.RandomDataPayload)
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:75)
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
... 33 more
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
record (com.test.hudi.RandomDataPayload)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at
org.apache.hudi.common.model.HoodieAvroRecord.readRecordPayload(HoodieAvroRecord.java:231)
at
org.apache.hudi.common.model.HoodieAvroRecord.readRecordPayload(HoodieAvroRecord.java:48)
at org.apache.hudi.common.model.HoodieRecord.read(HoodieRecord.java:373)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:520)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:512)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at
org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:102)
at
org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:76)
at
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:209)
at
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:202)
at
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:198)
at
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:67)
at
org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:198)
at
org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:56)
at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:350)
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:54)
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:44)
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:69)
... 34 more
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
... 66 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]