[ 
https://issues.apache.org/jira/browse/HUDI-3268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17478216#comment-17478216
 ] 

Ethan Guo commented on HUDI-3268:
---------------------------------

New stack trace:
{code:java}
scala> val basePath = "/tmp/hoodie/hudi-test-topic"
basePath: String = /tmp/hoodie/hudi-test-topic


scala> val df = spark.read.format("hudi").load(basePath)
df: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, 
_hoodie_commit_seqno: string ... 15 more fields]


scala> df.count
java.lang.NullPointerException
  at 
org.apache.hudi.SparkHoodieTableFileIndex._partitionSchemaFromProperties$lzycompute(SparkHoodieTableFileIndex.scala:83)
  at 
org.apache.hudi.SparkHoodieTableFileIndex._partitionSchemaFromProperties(SparkHoodieTableFileIndex.scala:77)
  at 
org.apache.hudi.SparkHoodieTableFileIndex.partitionSchema(SparkHoodieTableFileIndex.scala:118)
  at 
org.apache.hudi.SparkHoodieTableFileIndex.parsePartitionColumnValues(SparkHoodieTableFileIndex.scala:218)
  at 
org.apache.hudi.AbstractHoodieTableFileIndex.$anonfun$getAllQueryPartitionPaths$2(AbstractHoodieTableFileIndex.scala:229)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at scala.collection.TraversableLike.map(TraversableLike.scala:286)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
  at scala.collection.AbstractTraversable.map(Traversable.scala:108)
  at 
org.apache.hudi.AbstractHoodieTableFileIndex.getAllQueryPartitionPaths(AbstractHoodieTableFileIndex.scala:228)
  at 
org.apache.hudi.AbstractHoodieTableFileIndex.loadPartitionPathFiles(AbstractHoodieTableFileIndex.scala:184)
  at 
org.apache.hudi.AbstractHoodieTableFileIndex.refresh0(AbstractHoodieTableFileIndex.scala:107)
  at 
org.apache.hudi.AbstractHoodieTableFileIndex.<init>(AbstractHoodieTableFileIndex.scala:87)
  at 
org.apache.hudi.SparkHoodieTableFileIndex.<init>(SparkHoodieTableFileIndex.scala:59)
  at org.apache.hudi.HoodieFileIndex.<init>(HoodieFileIndex.scala:70)
  at 
org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:192)
  at 
org.apache.hudi.MergeOnReadSnapshotRelation.buildScan(MergeOnReadSnapshotRelation.scala:111)
  at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:323)
  at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:357)
  at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:413)
  at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:356)
  at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:323)
  at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
  at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
  at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
  at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
  at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
  at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
  at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
  at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
  at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
  at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
  at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
  at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
  at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
  at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
  at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
  at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
  at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
  at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
  at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
  at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
  at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
  at 
org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:468)
  at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$2(QueryExecution.scala:157)
  at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
  at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:157)
  at 
org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
  at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:150)
  at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:150)
  at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:170)
  at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
  at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:170)
  at 
org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
  at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
  at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
  at 
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:214)
  at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:259)
  at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.count(Dataset.scala:3011)
  ... 47 elided {code}

> Spark datasource cannot read MOR table written by Kafka Connect Sink
> --------------------------------------------------------------------
>
>                 Key: HUDI-3268
>                 URL: https://issues.apache.org/jira/browse/HUDI-3268
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Ethan Guo
>            Assignee: Ethan Guo
>            Priority: Blocker
>             Fix For: 0.11.0
>
>
> I'm hitting this on master with spark datasource reading MOR table from Kafka 
> Connect Sink:
> {code:java}
> scala> val basePath = "/tmp/hoodie/hudi-test-topic"
> basePath: String = /tmp/hoodie/hudi-test-topic
> scala> val df = spark.read.format("hudi").load(basePath)
> org.apache.hudi.spark.org.apache.spark.sql.avro.IncompatibleSchemaException: 
> Unexpected type null.
>   at 
> org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:199)
>   at 
> org.apache.hudi.MergeOnReadSnapshotRelation.liftedTree1$1(MergeOnReadSnapshotRelation.scala:74)
>   at 
> org.apache.hudi.MergeOnReadSnapshotRelation.tableAvroSchema$lzycompute(MergeOnReadSnapshotRelation.scala:69)
>   at 
> org.apache.hudi.MergeOnReadSnapshotRelation.tableAvroSchema(MergeOnReadSnapshotRelation.scala:68)
>   at 
> org.apache.hudi.MergeOnReadSnapshotRelation.tableStructSchema$lzycompute(MergeOnReadSnapshotRelation.scala:78)
>   at 
> org.apache.hudi.MergeOnReadSnapshotRelation.tableStructSchema(MergeOnReadSnapshotRelation.scala:78)
>   at 
> org.apache.hudi.MergeOnReadSnapshotRelation.schema(MergeOnReadSnapshotRelation.scala:97)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:440)
>   at 
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274)
>   at 
> org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:245)
>   at scala.Option.getOrElse(Option.scala:189)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:188)
>   ... 47 elided {code}
> I hit the same exception on Spark 2.4.4, 3.1.2, 3.2.0



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to