zuyanton opened a new issue #2237:
URL: https://github.com/apache/hudi/issues/2237


   
   **Describe the problem you faced**
   We are running MoR table with compaction set to be run inline, we have 
noticed that whenever log group grows to a large size (by either upsetting 
large amount of data or postponing compaction for a while ), performing 
compaction on the table fails with error ``` ERROR 
AbstractHoodieLogRecordScanner: Got exception when reading log file
   com.esotericsoftware.kryo.KryoException: Unable to find class: 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$119/1991639008```
 after Inspecting stack trace and running few tests we think that the problem 
is with ```ExternalSpillableMap``` more specifically with ```DiskBasedMap``` . 
whenever ```ExternalSpillableMap``` grows too large, it  starts spilling data 
to disk, involving ```DiskBasedMap.put```. whenever data gets read via 
```DiskBasedMap.get```, that's when we observe the error.
   
   **To Reproduce**
   1. set hudi property ```hoodie.memory.compaction.max.size->1``` to ensure 
that ```ExternalSpillableMap``` would spill everything to disk.   
   2. set ```"hoodie.compact.inline.max.delta.commits"->"1"``` to speed up 
compaction triggering   
   3. create a row ```var df = Seq((1, 2, 3)).toDF("pk", "partition", 
"sort_key")```
   4. create a hudi table out of this row by running 
```df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/testTable_zuyanton_1")```
  
   5. upsert this row once again to have hudi create a log group by running 
```df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/testTable_zuyanton_1")```
  once again   
   6. now upsert this row once again to trigger compaction by running 
```df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/testTable_zuyanton_1")```
   once again  
   7. observe the error  
   
   stack trace and full code snippets are bellow  
   
   **Additional info**  
   possibly related to :   
   https://issues.apache.org/jira/browse/HUDI-1205  
   https://github.com/apache/hudi/issues/1890  
   https://github.com/apache/hudi/issues/1823
   
   **Environment Description**
   
   * Hudi version : 0.6.0
   
   * Spark version :
   
   * Hive version :
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   
   **Stacktrace**
   
   ```20/11/04 03:50:06 ERROR AbstractHoodieLogRecordScanner: Got exception 
when reading log file
   com.esotericsoftware.kryo.KryoException: Unable to find class: 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$119/1991639008
   Serialization trace:
   orderingVal (org.apache.hudi.common.model.OverwriteWithLatestAvroPayload)
   data (org.apache.hudi.common.model.HoodieRecord)
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
        at 
org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:107)
        at 
org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:81)
        at 
org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:222)
        at 
org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:215)
        at 
org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:211)
        at 
org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:169)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.processNextRecord(HoodieMergedLogRecordScanner.java:115)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:278)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:306)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:153)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
        at 
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:128)
        at 
org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:99)
        at 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
        at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.ClassNotFoundException: 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$119/1991639008
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
 
   ``` 
   
   **Full code snippet**  
   
   ```  import org.apache.spark.sql.functions._
     import org.apache.hudi.hive.MultiPartKeysValueExtractor
     import org.apache.hudi.QuickstartUtils._
     import scala.collection.JavaConversions._
     import org.apache.spark.sql.SaveMode
     import org.apache.hudi.DataSourceReadOptions._
     import org.apache.hudi.DataSourceWriteOptions._
     import org.apache.hudi.DataSourceWriteOptions
     import org.apache.hudi.config.HoodieWriteConfig._
     import org.apache.hudi.config.HoodieWriteConfig
     import org.apache.hudi.keygen.ComplexKeyGenerator
     import org.apache.hadoop.hive.conf.HiveConf
     val hiveConf = new HiveConf()
     val hiveMetastoreURI = 
hiveConf.get("hive.metastore.uris").replaceAll("thrift://", "")
     val hiveServer2URI = hiveMetastoreURI.substring(0, 
hiveMetastoreURI.lastIndexOf(":"))
     var hudiOptions = Map[String,String](
       HoodieWriteConfig.TABLE_NAME → "testTable_zuyanton",
       "hoodie.consistency.check.enabled"->"true",
       "hoodie.compact.inline.max.delta.commits"->"1",
       "hoodie.compact.inline"->"true",
       DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ",
       DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "pk",
       DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> 
classOf[ComplexKeyGenerator].getName,
       DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY ->"partition",
       DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "sort_key",
       DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY → "true",
       DataSourceWriteOptions.HIVE_TABLE_OPT_KEY → "testTable_zuyanton",
       DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY → "partition",
       DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY → 
classOf[MultiPartKeysValueExtractor].getName,
       DataSourceWriteOptions.HIVE_URL_OPT_KEY 
->s"jdbc:hive2://$hiveServer2URI:10000",
       "hoodie.memory.compaction.max.size" -> "1"
     )
   
     //spark.sql("drop table if exists testTable_zuyanton_ro")
     //spark.sql("drop table if exists testTable_zuyanton_rt")
     var df = Seq((1, 2, 3)).toDF("pk", "partition", "sort_key")
     
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testbucket/testTable_zuyanton_1")
     
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testbucket/testTable_zuyanton_1")
     
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testbucket/testTable_zuyanton_1")
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to