stackfun opened a new issue #2369:
URL: https://github.com/apache/hudi/issues/2369


   **Describe the problem you faced**
   
   Read Optimized query fails on GCS for MOR tables. Not sure if the table has 
to have a large amount of partitions to reproduce this issue.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create a large MOR table on GCS with thousands of partitions 
   2. Do read optimized query on the table from spark datasource
   ```
   # pyspark code
   read_options =  {"hoodie.datasource.query.type": "read_optimized"}
   spark.read.format("hudi").options(**read_options).load(path).count()
   ```
   
   **Expected behavior**
   
   Successful read-optimized query
   
   **Environment Description**
   
   * Hudi version : 0.6.0
   
   * Dataproc 1.4
   
   * Spark version : 2.4.5
   
   * Hive version : 2.3.7
   
   * Hadoop version : 2.9.2
   
   * Storage (HDFS/S3/GCS..) : GCS
   
   * Running on Docker? (yes/no) : no
   
   
   **Stacktrace**
   
   ```
   org.apache.spark.SparkException: Task not serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:820)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:819)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:819)
        at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:208)
        at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles(InMemoryFileIndex.scala:293)
        at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:177)
        at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:176)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:176)
        at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles(InMemoryFileIndex.scala:293)
        at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:177)
        at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:176)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:176)
        at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:127)
        at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91)
        at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:67)
        at 
org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$createInMemoryFileIndex(DataSource.scala:533)
        at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:371)
        at 
org.apache.hudi.DefaultSource.getBaseFileOnlyView(DefaultSource.scala:176)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:95)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53)
        at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
        at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.NotSerializableException: org.apache.hadoop.fs.Path
   Serialization stack:
        - object not serializable (class: org.apache.hadoop.fs.Path, value: 
gs://some_bucket/ts=2020-05-01/7d58a4f9-ebf2-49da-bf5e-c39b7a4213c8-0_125-34-16471_20201221064949.parquet)
        - writeObject data (class: java.util.HashSet)
        - object (class java.util.HashSet, 
[gs://some_bucket/ts=2020-05-01/7d58a4f9-ebf2-49da-bf5e-c39b7a4213c8-0_125-34-16471_20201221064949.parquet])
        - writeObject data (class: java.util.HashMap)
        - object (class java.util.HashMap, {*extremely large map here*})
        - field (class: org.apache.hudi.hadoop.HoodieROTablePathFilter, name: 
hoodiePathCache, type: interface java.util.Map)
        - object (class org.apache.hudi.hadoop.HoodieROTablePathFilter, 
org.apache.hudi.hadoop.HoodieROTablePathFilter@28576363)
        - field (class: 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$3, name: 
filter$1, type: interface org.apache.hadoop.fs.PathFilter)
        - object (class 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$3, 
<function1>)
        at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
        ... 52 more
   
   ```
   
   I am setting "spark.serializer" to 
"org.apache.spark.serializer.KryoSerializer" and 
"spark.sql.hive.convertMetastoreParquet" to "false"


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to