[ 
https://issues.apache.org/jira/browse/HUDI-6858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-6858:
----------------------------
    Description: 
Spark Structured streaming fails due to commit metadata parsing

{code:java}
Failed to parse HoodieCommitMetadata for 
[==>20230902092834911__compaction__REQUESTED]
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
Caused by: org.apache.hudi.exception.HoodieIOException: Failed to parse 
HoodieCommitMetadata for [==>20230902092834911__compaction__REQUESTED]
        at 
org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:173)
 ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[?:?]
        at 
java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:361) ~[?:?]
        at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:528) 
~[?:?]
        at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) ~[?:?]
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) 
~[?:?]
        at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150) 
~[?:?]
        at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
        at 
java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647) ~[?:?]
        at 
org.apache.hudi.common.util.CommitUtils.getValidCheckpointForCurrentWriter(CommitUtils.java:175)
 ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
org.apache.hudi.HoodieStreamingSink.canSkipBatch(HoodieStreamingSink.scala:313) 
~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:104) 
~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:729)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
 ~[spark-catalyst_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:123)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]

Caused by: java.io.IOException: unable to read commit metadata
        at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:496)
 ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:163)
 ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[?:?]
        at 
java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:361) ~[?:?]
        at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:528) 
~[?:?]
        at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) ~[?:?]
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) 
~[?:?]
        at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150) 
~[?:?]
        at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
        at 
java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647) ~[?:?]
        at 
org.apache.hudi.common.util.CommitUtils.getValidCheckpointForCurrentWriter(CommitUtils.java:175)
 ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
org.apache.hudi.HoodieStreamingSink.canSkipBatch(HoodieStreamingSink.scala:313) 
~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:104) 
~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:729)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
 ~[spark-catalyst_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]

Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 
'Objavro': was expecting (JSON String, Number, Array, Object or token 'null', 
'true' or 'false')
 at [Source: (StringReader); line: 1, column: 11]
        at 
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) 
~[jackson-core-2.13.4.jar:2.13.4]
        at 
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
 ~[jackson-core-2.13.4.jar:2.13.4]
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2961)
 ~[jackson-core-2.13.4.jar:2.13.4]
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:2002)
 ~[jackson-core-2.13.4.jar:2.13.4]
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:802)
 ~[jackson-core-2.13.4.jar:2.13.4]
        at 
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4761)
 ~[jackson-databind-2.13.4.jar:2.13.4]
        at 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4667)
 ~[jackson-databind-2.13.4.jar:2.13.4]
        at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629) 
~[jackson-databind-2.13.4.jar:2.13.4]
        at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597) 
~[jackson-databind-2.13.4.jar:2.13.4]
        at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:240)
 ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:494)
 ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:163)
 ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[?:?]
        at 
java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:361) ~[?:?]
        at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:528) 
~[?:?]
        at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) ~[?:?]
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) 
~[?:?]
        at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150) 
~[?:?]
        at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
        at 
java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647) ~[?:?]
        at 
org.apache.hudi.common.util.CommitUtils.getValidCheckpointForCurrentWriter(CommitUtils.java:175)
 ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
org.apache.hudi.HoodieStreamingSink.canSkipBatch(HoodieStreamingSink.scala:313) 
~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:104) 
~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:729)
 ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
 ~[spark-catalyst_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
{code}


> Fix checkpoint reading in Spark structured streaming
> ----------------------------------------------------
>
>                 Key: HUDI-6858
>                 URL: https://issues.apache.org/jira/browse/HUDI-6858
>             Project: Apache Hudi
>          Issue Type: Bug
>    Affects Versions: 0.13.1
>            Reporter: Ethan Guo
>            Assignee: Ethan Guo
>            Priority: Blocker
>             Fix For: 0.14.0
>
>
> Spark Structured streaming fails due to commit metadata parsing
> {code:java}
> Failed to parse HoodieCommitMetadata for 
> [==>20230902092834911__compaction__REQUESTED]
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
>  ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
>  ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
> Caused by: org.apache.hudi.exception.HoodieIOException: Failed to parse 
> HoodieCommitMetadata for [==>20230902092834911__compaction__REQUESTED]
>       at 
> org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:173)
>  ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) 
> ~[?:?]
>       at 
> java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:361) ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:528)
>  ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) 
> ~[?:?]
>       at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150) 
> ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
>       at 
> java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647) 
> ~[?:?]
>       at 
> org.apache.hudi.common.util.CommitUtils.getValidCheckpointForCurrentWriter(CommitUtils.java:175)
>  ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> org.apache.hudi.HoodieStreamingSink.canSkipBatch(HoodieStreamingSink.scala:313)
>  ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:104) 
> ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:729)
>  ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
>       at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
>  ~[spark-catalyst_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
>  ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
>       at 
> org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:123)
>  ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
>  ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
> Caused by: java.io.IOException: unable to read commit metadata
>       at 
> org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:496)
>  ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:163)
>  ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) 
> ~[?:?]
>       at 
> java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:361) ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:528)
>  ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) 
> ~[?:?]
>       at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150) 
> ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
>       at 
> java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647) 
> ~[?:?]
>       at 
> org.apache.hudi.common.util.CommitUtils.getValidCheckpointForCurrentWriter(CommitUtils.java:175)
>  ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> org.apache.hudi.HoodieStreamingSink.canSkipBatch(HoodieStreamingSink.scala:313)
>  ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:104) 
> ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:729)
>  ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
>       at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
>  ~[spark-catalyst_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
> Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 
> 'Objavro': was expecting (JSON String, Number, Array, Object or token 'null', 
> 'true' or 'false')
>  at [Source: (StringReader); line: 1, column: 11]
>       at 
> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) 
> ~[jackson-core-2.13.4.jar:2.13.4]
>       at 
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
>  ~[jackson-core-2.13.4.jar:2.13.4]
>       at 
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2961)
>  ~[jackson-core-2.13.4.jar:2.13.4]
>       at 
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:2002)
>  ~[jackson-core-2.13.4.jar:2.13.4]
>       at 
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:802)
>  ~[jackson-core-2.13.4.jar:2.13.4]
>       at 
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4761)
>  ~[jackson-databind-2.13.4.jar:2.13.4]
>       at 
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4667)
>  ~[jackson-databind-2.13.4.jar:2.13.4]
>       at 
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629) 
> ~[jackson-databind-2.13.4.jar:2.13.4]
>       at 
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597) 
> ~[jackson-databind-2.13.4.jar:2.13.4]
>       at 
> org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:240)
>  ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:494)
>  ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:163)
>  ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) 
> ~[?:?]
>       at 
> java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:361) ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:528)
>  ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) 
> ~[?:?]
>       at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150) 
> ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
>       at 
> java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647) 
> ~[?:?]
>       at 
> org.apache.hudi.common.util.CommitUtils.getValidCheckpointForCurrentWriter(CommitUtils.java:175)
>  ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> org.apache.hudi.HoodieStreamingSink.canSkipBatch(HoodieStreamingSink.scala:313)
>  ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:104) 
> ~[hudi-spark3-bundle_2.12-0.13.1-amzn-0.jar:0.13.1-amzn-0]
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:729)
>  ~[spark-sql_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
>       at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
>  ~[spark-catalyst_2.12-3.4.0-amzn-0.jar:3.4.0-amzn-0]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to