eric created HUDI-6523:
--------------------------

             Summary: Fix get valid checkpoint for current writer
                 Key: HUDI-6523
                 URL: https://issues.apache.org/jira/browse/HUDI-6523
             Project: Apache Hudi
          Issue Type: Bug
          Components: spark
            Reporter: eric
             Fix For: 0.14.0


23/07/11 16:50:57 INFO HoodieCompactor: Compactor compacting 
[CompactionOperation{baseInstantTime='20230711165027926', 
dataFileCommitTime=Option{val=20230711165027926}, 
deltaFileNames=[.00000001-2bae-40d8-8038-401eefb9e7e3-0_20230711165027926.log.1_1-53-670],
 
dataFileName=Option{val=00000001-2bae-40d8-8038-401eefb9e7e3-0_1-27-432_20230711165027926.parquet},
 id='HoodieFileGroupId{partitionPath='part=2023071116', 
fileId='00000001-2bae-40d8-8038-401eefb9e7e3-0'}', 
metrics={TOTAL_LOG_FILES=1.0, TOTAL_IO_READ_MB=0.0, 
TOTAL_LOG_FILES_SIZE=3976.0, TOTAL_IO_WRITE_MB=0.0, TOTAL_IO_MB=0.0}, 
bootstrapFilePath=Optional.empty}, 
CompactionOperation{baseInstantTime='20230711165027926', 
dataFileCommitTime=Option{val=20230711165027926}, 
deltaFileNames=[.00000000-eb61-4788-a9cb-aaa67e2e47c4-0_20230711165027926.log.1_0-53-671],
 
dataFileName=Option{val=00000000-eb61-4788-a9cb-aaa67e2e47c4-0_0-27-431_20230711165027926.parquet},
 id='HoodieFileGroupId{partitionPath='part=2023071116', 
fileId='00000000-eb61-4788-a9cb-aaa67e2e47c4-0'}', 
metrics={TOTAL_LOG_FILES=1.0, TOTAL_IO_READ_MB=0.0, 
TOTAL_LOG_FILES_SIZE=3592.0, TOTAL_IO_WRITE_MB=0.0, TOTAL_IO_MB=0.0}, 
bootstrapFilePath=Optional.empty}, 
CompactionOperation{baseInstantTime='20230711165027926', 
dataFileCommitTime=Option{val=20230711165027926}, 
deltaFileNames=[.00000002-7160-4515-a0a6-7bcf2e0cccc3-0_20230711165027926.log.1_2-53-673],
 
dataFileName=Option{val=00000002-7160-4515-a0a6-7bcf2e0cccc3-0_2-27-433_20230711165027926.parquet},
 id='HoodieFileGroupId{partitionPath='part=2023071116', 
fileId='00000002-7160-4515-a0a6-7bcf2e0cccc3-0'}', 
metrics={TOTAL_LOG_FILES=1.0, TOTAL_IO_READ_MB=0.0, 
TOTAL_LOG_FILES_SIZE=3591.0, TOTAL_IO_WRITE_MB=0.0, TOTAL_IO_MB=0.0}, 
bootstrapFilePath=Optional.empty}, 
CompactionOperation{baseInstantTime='20230711165027926', 
dataFileCommitTime=Option{val=20230711165027926}, 
deltaFileNames=[.00000003-5a31-411f-8430-ccf4bec128e8-0_20230711165027926.log.1_3-53-672],
 
dataFileName=Option{val=00000003-5a31-411f-8430-ccf4bec128e8-0_3-27-434_20230711165027926.parquet},
 id='HoodieFileGroupId{partitionPath='part=2023071116', 
fileId='00000003-5a31-411f-8430-ccf4bec128e8-0'}', 
metrics={TOTAL_LOG_FILES=1.0, TOTAL_IO_READ_MB=0.0, 
TOTAL_LOG_FILES_SIZE=3207.0, TOTAL_IO_WRITE_MB=0.0, TOTAL_IO_MB=0.0}, 
bootstrapFilePath=Optional.empty}] files
23/07/11 16:50:57 ERROR MicroBatchExecution: Query RateStreamSource [id = 
44581078-04ee-48ae-bc74-143b3c836a23, runId = 
916ce5a4-bd8a-4010-8c0e-869c58db41ab] terminated with error
org.apache.hudi.exception.HoodieIOException: Failed to parse 
HoodieCommitMetadata for [==>20230711165055791__compaction__REQUESTED]
        at 
org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:173)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at 
java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:356)
        at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:500)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
        at 
org.apache.hudi.common.util.CommitUtils.getValidCheckpointForCurrentWriter(CommitUtils.java:175)
        at 
org.apache.hudi.HoodieStreamingSink.canSkipBatch(HoodieStreamingSink.scala:313)
        at 
org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:104)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: java.io.IOException: unable to read commit metadata
        at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:496)
        at 
org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:163)
        ... 35 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 
'Objavro': was expecting (JSON String, Number, Array, Object or token 'null', 
'true' or 'false')
 at [Source: (String)"Obj\u0001\u0002\u0016avro.schema� 
{"type":"record","name":"HoodieCompactionPlan","namespace":"org.apache.hudi.avro.model","fields":[{"name":"operations","type":["null",{"type":"array","items":{"type":"record","name":"HoodieCompactionOperation","fields":[{"name":"baseInstantTime","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"deltaFilePaths","type":["null",{"type":"array","items":{"type":"string","avro.java.string":"String"}}],"default":null},{"name":"dataFilePath","type":["null",{"type"[truncated
 3045 chars]; line: 1, column: 11]
        at 
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
        at 
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2961)
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:2002)
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:802)
        at 
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4761)
        at 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4667)
        at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)
        at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)
        at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:240)
        at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:494)
        ... 36 more
23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_55_piece0 on 
io153:43928 in memory (size: 60.4 KiB, free: 910.5 MiB)
23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_55_piece0 on 
io153:59367 in memory (size: 60.4 KiB, free: 911.7 MiB)
23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_49_piece0 on 
io153:43928 in memory (size: 60.4 KiB, free: 910.6 MiB)
23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_49_piece0 on 
io153:59367 in memory (size: 60.4 KiB, free: 911.8 MiB)
23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_43_piece0 on 
io153:43928 in memory (size: 5.2 KiB, free: 910.6 MiB)
23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_43_piece0 on 
io148:39480 in memory (size: 5.2 KiB, free: 911.0 MiB)
Exception in thread "main" 
org.apache.spark.sql.streaming.StreamingQueryException: Failed to parse 
HoodieCommitMetadata for [==>20230711165055791__compaction__REQUESTED]
=== Streaming Query ===
Identifier: RateStreamSource [id = 44581078-04ee-48ae-bc74-143b3c836a23, runId 
= 916ce5a4-bd8a-4010-8c0e-869c58db41ab]
Current Committed Offsets: {RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, 
numPartitions=4: 44}
Current Available Offsets: {RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, 
numPartitions=4: 53}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [value#1L AS user_id#4L, to_utc_timestamp(timestamp#0, Asia/Shanghai) 
AS create_date#5, (value#1L + cast(1 as bigint)) AS cut_id#6L, 
offer_user_relat_id AS offer_user_relat_id#7, offer_inst_id AS offer_inst_id#8, 
offer_id AS offer_id#9, src_system_type AS src_system_type#10, role_id AS 
role_id#11, is_main_offer AS is_main_offer#12, is_grp_main_user AS 
is_grp_main_user#13, state AS state#14, done_code AS done_code#15, 
date_format(now(), yyyyMMddHHmmss, Some(Asia/Shanghai)) AS done_date#16, 
date_format(now(), yyyyMMddHHmmss, Some(Asia/Shanghai)) AS effective_date#17, 
date_format(now(), yyyyMMddHHmmss, Some(Asia/Shanghai)) AS expire_date#18, 30 
AS county_code#19, 20 AS op_id#20, 40 AS org_id#21, group_region_id AS 
group_region_id#22, user_region_id AS user_region_id#23, region_id AS 
region_id#24, effective_date_type AS effective_date_type#25, expire_date_type 
AS expire_date_type#26, remark AS remark#27, date_format(now(), yyyyMMddHH, 
Some(Asia/Shanghai)) AS part#28]
+- SubqueryAlias t
   +- View (`t`, [timestamp#0,value#1L])
      +- StreamingDataSourceV2Relation [timestamp#0, value#1L], 
org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1@6ca4c8b1,
 RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=4

        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:330)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: org.apache.hudi.exception.HoodieIOException: Failed to parse 
HoodieCommitMetadata for [==>20230711165055791__compaction__REQUESTED]
        at 
org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:173)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at 
java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:356)
        at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:500)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
        at 
org.apache.hudi.common.util.CommitUtils.getValidCheckpointForCurrentWriter(CommitUtils.java:175)
        at 
org.apache.hudi.HoodieStreamingSink.canSkipBatch(HoodieStreamingSink.scala:313)
        at 
org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:104)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
        ... 1 more
Caused by: java.io.IOException: unable to read commit metadata
        at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:496)
        at 
org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:163)
        ... 35 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 
'Objavro': was expecting (JSON String, Number, Array, Object or token 'null', 
'true' or 'false')
 at [Source: (String)"Obj\u0001\u0002\u0016avro.schema� 
{"type":"record","name":"HoodieCompactionPlan","namespace":"org.apache.hudi.avro.model","fields":[{"name":"operations","type":["null",{"type":"array","items":{"type":"record","name":"HoodieCompactionOperation","fields":[{"name":"baseInstantTime","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"deltaFilePaths","type":["null",{"type":"array","items":{"type":"string","avro.java.string":"String"}}],"default":null},{"name":"dataFilePath","type":["null",{"type"[truncated
 3045 chars]; line: 1, column: 11]
        at 
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
        at 
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2961)
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:2002)
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:802)
        at 
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4761)
        at 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4667)
        at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)
        at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)
        at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:240)
        at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:494)
        ... 36 more
23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_50_piece0 on 
io148:39480 in memory (size: 60.4 KiB, free: 911.1 MiB)
23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_50_piece0 on 
io153:43928 in memory (size: 60.4 KiB, free: 910.6 MiB)
23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_48_piece0 on 
io148:39480 in memory (size: 253.6 KiB, free: 911.3 MiB)
23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_48_piece0 on 
io153:43928 in memory (size: 253.6 KiB, free: 910.9 MiB)
23/07/11 16:50:57 INFO SparkContext: Invoking stop() from shutdown hook




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to