zuyanton opened a new issue #2237:
URL: https://github.com/apache/hudi/issues/2237
**Describe the problem you faced**
We are running MoR table with compaction set to be run inline, we have
noticed that whenever log group grows to a large size (by either upsetting
large amount of data or postponing compaction for a while ), performing
compaction on the table fails with error ``` ERROR
AbstractHoodieLogRecordScanner: Got exception when reading log file
com.esotericsoftware.kryo.KryoException: Unable to find class:
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$119/1991639008```
after Inspecting stack trace and running few tests we think that the problem
is with ```ExternalSpillableMap``` more specifically with ```DiskBasedMap``` .
whenever ```ExternalSpillableMap``` grows too large, it starts spilling data
to disk, involving ```DiskBasedMap.put```. whenever data gets read via
```DiskBasedMap.get```, that's when we observe the error.
**To Reproduce**
1. set hudi property ```hoodie.memory.compaction.max.size->1``` to ensure
that ```ExternalSpillableMap``` would spill everything to disk.
2. set ```"hoodie.compact.inline.max.delta.commits"->"1"``` to speed up
compaction triggering
3. create a row ```var df = Seq((1, 2, 3)).toDF("pk", "partition",
"sort_key")```
4. create a hudi table out of this row by running
```df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/testTable_zuyanton_1")```
5. upsert this row once again to have hudi create a log group by running
```df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/testTable_zuyanton_1")```
once again
6. now upsert this row once again to trigger compaction by running
```df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/testTable_zuyanton_1")```
once again
7. observe the error
stack trace and full code snippets are bellow
**Additional info**
possibly related to :
https://issues.apache.org/jira/browse/HUDI-1205
https://github.com/apache/hudi/issues/1890
https://github.com/apache/hudi/issues/1823
**Environment Description**
* Hudi version : 0.6.0
* Spark version :
* Hive version :
* Hadoop version :
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no
**Stacktrace**
```20/11/04 03:50:06 ERROR AbstractHoodieLogRecordScanner: Got exception
when reading log file
com.esotericsoftware.kryo.KryoException: Unable to find class:
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$119/1991639008
Serialization trace:
orderingVal (org.apache.hudi.common.model.OverwriteWithLatestAvroPayload)
data (org.apache.hudi.common.model.HoodieRecord)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at
org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:107)
at
org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:81)
at
org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:222)
at
org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:215)
at
org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:211)
at
org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:169)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.processNextRecord(HoodieMergedLogRecordScanner.java:115)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:278)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:306)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:153)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
at
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:128)
at
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:99)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException:
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$119/1991639008
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
```
**Full code snippet**
``` import org.apache.spark.sql.functions._
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hadoop.hive.conf.HiveConf
val hiveConf = new HiveConf()
val hiveMetastoreURI =
hiveConf.get("hive.metastore.uris").replaceAll("thrift://", "")
val hiveServer2URI = hiveMetastoreURI.substring(0,
hiveMetastoreURI.lastIndexOf(":"))
var hudiOptions = Map[String,String](
HoodieWriteConfig.TABLE_NAME → "testTable_zuyanton",
"hoodie.consistency.check.enabled"->"true",
"hoodie.compact.inline.max.delta.commits"->"1",
"hoodie.compact.inline"->"true",
DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "pk",
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY ->
classOf[ComplexKeyGenerator].getName,
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY ->"partition",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "sort_key",
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY → "true",
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY → "testTable_zuyanton",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY → "partition",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY →
classOf[MultiPartKeysValueExtractor].getName,
DataSourceWriteOptions.HIVE_URL_OPT_KEY
->s"jdbc:hive2://$hiveServer2URI:10000",
"hoodie.memory.compaction.max.size" -> "1"
)
//spark.sql("drop table if exists testTable_zuyanton_ro")
//spark.sql("drop table if exists testTable_zuyanton_rt")
var df = Seq((1, 2, 3)).toDF("pk", "partition", "sort_key")
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testbucket/testTable_zuyanton_1")
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testbucket/testTable_zuyanton_1")
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testbucket/testTable_zuyanton_1")
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]