[
https://issues.apache.org/jira/browse/HUDI-6523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated HUDI-6523:
---------------------------------
Labels: pull-request-available (was: )
> Fix get valid checkpoint for current writer
> -------------------------------------------
>
> Key: HUDI-6523
> URL: https://issues.apache.org/jira/browse/HUDI-6523
> Project: Apache Hudi
> Issue Type: Bug
> Components: spark
> Reporter: eric
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.14.0
>
>
> 23/07/11 16:50:57 INFO HoodieCompactor: Compactor compacting
> [CompactionOperation{baseInstantTime='20230711165027926',
> dataFileCommitTime=Option{val=20230711165027926},
> deltaFileNames=[.00000001-2bae-40d8-8038-401eefb9e7e3-0_20230711165027926.log.1_1-53-670],
>
> dataFileName=Option{val=00000001-2bae-40d8-8038-401eefb9e7e3-0_1-27-432_20230711165027926.parquet},
> id='HoodieFileGroupId{partitionPath='part=2023071116',
> fileId='00000001-2bae-40d8-8038-401eefb9e7e3-0'}',
> metrics={TOTAL_LOG_FILES=1.0, TOTAL_IO_READ_MB=0.0,
> TOTAL_LOG_FILES_SIZE=3976.0, TOTAL_IO_WRITE_MB=0.0, TOTAL_IO_MB=0.0},
> bootstrapFilePath=Optional.empty},
> CompactionOperation{baseInstantTime='20230711165027926',
> dataFileCommitTime=Option{val=20230711165027926},
> deltaFileNames=[.00000000-eb61-4788-a9cb-aaa67e2e47c4-0_20230711165027926.log.1_0-53-671],
>
> dataFileName=Option{val=00000000-eb61-4788-a9cb-aaa67e2e47c4-0_0-27-431_20230711165027926.parquet},
> id='HoodieFileGroupId{partitionPath='part=2023071116',
> fileId='00000000-eb61-4788-a9cb-aaa67e2e47c4-0'}',
> metrics={TOTAL_LOG_FILES=1.0, TOTAL_IO_READ_MB=0.0,
> TOTAL_LOG_FILES_SIZE=3592.0, TOTAL_IO_WRITE_MB=0.0, TOTAL_IO_MB=0.0},
> bootstrapFilePath=Optional.empty},
> CompactionOperation{baseInstantTime='20230711165027926',
> dataFileCommitTime=Option{val=20230711165027926},
> deltaFileNames=[.00000002-7160-4515-a0a6-7bcf2e0cccc3-0_20230711165027926.log.1_2-53-673],
>
> dataFileName=Option{val=00000002-7160-4515-a0a6-7bcf2e0cccc3-0_2-27-433_20230711165027926.parquet},
> id='HoodieFileGroupId{partitionPath='part=2023071116',
> fileId='00000002-7160-4515-a0a6-7bcf2e0cccc3-0'}',
> metrics={TOTAL_LOG_FILES=1.0, TOTAL_IO_READ_MB=0.0,
> TOTAL_LOG_FILES_SIZE=3591.0, TOTAL_IO_WRITE_MB=0.0, TOTAL_IO_MB=0.0},
> bootstrapFilePath=Optional.empty},
> CompactionOperation{baseInstantTime='20230711165027926',
> dataFileCommitTime=Option{val=20230711165027926},
> deltaFileNames=[.00000003-5a31-411f-8430-ccf4bec128e8-0_20230711165027926.log.1_3-53-672],
>
> dataFileName=Option{val=00000003-5a31-411f-8430-ccf4bec128e8-0_3-27-434_20230711165027926.parquet},
> id='HoodieFileGroupId{partitionPath='part=2023071116',
> fileId='00000003-5a31-411f-8430-ccf4bec128e8-0'}',
> metrics={TOTAL_LOG_FILES=1.0, TOTAL_IO_READ_MB=0.0,
> TOTAL_LOG_FILES_SIZE=3207.0, TOTAL_IO_WRITE_MB=0.0, TOTAL_IO_MB=0.0},
> bootstrapFilePath=Optional.empty}] files
> 23/07/11 16:50:57 ERROR MicroBatchExecution: Query RateStreamSource [id =
> 44581078-04ee-48ae-bc74-143b3c836a23, runId =
> 916ce5a4-bd8a-4010-8c0e-869c58db41ab] terminated with error
> org.apache.hudi.exception.HoodieIOException: Failed to parse
> HoodieCommitMetadata for [==>20230711165055791__compaction__REQUESTED]
> at
> org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:173)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:356)
> at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:500)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
> at
> org.apache.hudi.common.util.CommitUtils.getValidCheckpointForCurrentWriter(CommitUtils.java:175)
> at
> org.apache.hudi.HoodieStreamingSink.canSkipBatch(HoodieStreamingSink.scala:313)
> at
> org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:104)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
> Caused by: java.io.IOException: unable to read commit metadata
> at
> org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:496)
> at
> org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:163)
> ... 35 more
> Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token
> 'Objavro': was expecting (JSON String, Number, Array, Object or token 'null',
> 'true' or 'false')
> at [Source: (String)"Obj\u0001\u0002\u0016avro.schema�
> {"type":"record","name":"HoodieCompactionPlan","namespace":"org.apache.hudi.avro.model","fields":[{"name":"operations","type":["null",{"type":"array","items":{"type":"record","name":"HoodieCompactionOperation","fields":[{"name":"baseInstantTime","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"deltaFilePaths","type":["null",{"type":"array","items":{"type":"string","avro.java.string":"String"}}],"default":null},{"name":"dataFilePath","type":["null",{"type"[truncated
> 3045 chars]; line: 1, column: 11]
> at
> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
> at
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
> at
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2961)
> at
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:2002)
> at
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:802)
> at
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4761)
> at
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4667)
> at
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)
> at
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)
> at
> org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:240)
> at
> org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:494)
> ... 36 more
> 23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_55_piece0 on
> io153:43928 in memory (size: 60.4 KiB, free: 910.5 MiB)
> 23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_55_piece0 on
> io153:59367 in memory (size: 60.4 KiB, free: 911.7 MiB)
> 23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_49_piece0 on
> io153:43928 in memory (size: 60.4 KiB, free: 910.6 MiB)
> 23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_49_piece0 on
> io153:59367 in memory (size: 60.4 KiB, free: 911.8 MiB)
> 23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_43_piece0 on
> io153:43928 in memory (size: 5.2 KiB, free: 910.6 MiB)
> 23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_43_piece0 on
> io148:39480 in memory (size: 5.2 KiB, free: 911.0 MiB)
> Exception in thread "main"
> org.apache.spark.sql.streaming.StreamingQueryException: Failed to parse
> HoodieCommitMetadata for [==>20230711165055791__compaction__REQUESTED]
> === Streaming Query ===
> Identifier: RateStreamSource [id = 44581078-04ee-48ae-bc74-143b3c836a23,
> runId = 916ce5a4-bd8a-4010-8c0e-869c58db41ab]
> Current Committed Offsets: {RateStreamV2[rowsPerSecond=1,
> rampUpTimeSeconds=0, numPartitions=4: 44}
> Current Available Offsets: {RateStreamV2[rowsPerSecond=1,
> rampUpTimeSeconds=0, numPartitions=4: 53}
> Current State: ACTIVE
> Thread State: RUNNABLE
> Logical Plan:
> Project [value#1L AS user_id#4L, to_utc_timestamp(timestamp#0, Asia/Shanghai)
> AS create_date#5, (value#1L + cast(1 as bigint)) AS cut_id#6L,
> offer_user_relat_id AS offer_user_relat_id#7, offer_inst_id AS
> offer_inst_id#8, offer_id AS offer_id#9, src_system_type AS
> src_system_type#10, role_id AS role_id#11, is_main_offer AS is_main_offer#12,
> is_grp_main_user AS is_grp_main_user#13, state AS state#14, done_code AS
> done_code#15, date_format(now(), yyyyMMddHHmmss, Some(Asia/Shanghai)) AS
> done_date#16, date_format(now(), yyyyMMddHHmmss, Some(Asia/Shanghai)) AS
> effective_date#17, date_format(now(), yyyyMMddHHmmss, Some(Asia/Shanghai)) AS
> expire_date#18, 30 AS county_code#19, 20 AS op_id#20, 40 AS org_id#21,
> group_region_id AS group_region_id#22, user_region_id AS user_region_id#23,
> region_id AS region_id#24, effective_date_type AS effective_date_type#25,
> expire_date_type AS expire_date_type#26, remark AS remark#27,
> date_format(now(), yyyyMMddHH, Some(Asia/Shanghai)) AS part#28]
> +- SubqueryAlias t
> +- View (`t`, [timestamp#0,value#1L])
> +- StreamingDataSourceV2Relation [timestamp#0, value#1L],
> org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1@6ca4c8b1,
> RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=4
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:330)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
> Caused by: org.apache.hudi.exception.HoodieIOException: Failed to parse
> HoodieCommitMetadata for [==>20230711165055791__compaction__REQUESTED]
> at
> org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:173)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:356)
> at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:500)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
> at
> org.apache.hudi.common.util.CommitUtils.getValidCheckpointForCurrentWriter(CommitUtils.java:175)
> at
> org.apache.hudi.HoodieStreamingSink.canSkipBatch(HoodieStreamingSink.scala:313)
> at
> org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:104)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
> ... 1 more
> Caused by: java.io.IOException: unable to read commit metadata
> at
> org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:496)
> at
> org.apache.hudi.common.util.CommitUtils.lambda$getValidCheckpointForCurrentWriter$1(CommitUtils.java:163)
> ... 35 more
> Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token
> 'Objavro': was expecting (JSON String, Number, Array, Object or token 'null',
> 'true' or 'false')
> at [Source: (String)"Obj\u0001\u0002\u0016avro.schema�
> {"type":"record","name":"HoodieCompactionPlan","namespace":"org.apache.hudi.avro.model","fields":[{"name":"operations","type":["null",{"type":"array","items":{"type":"record","name":"HoodieCompactionOperation","fields":[{"name":"baseInstantTime","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"deltaFilePaths","type":["null",{"type":"array","items":{"type":"string","avro.java.string":"String"}}],"default":null},{"name":"dataFilePath","type":["null",{"type"[truncated
> 3045 chars]; line: 1, column: 11]
> at
> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
> at
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
> at
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2961)
> at
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:2002)
> at
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:802)
> at
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4761)
> at
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4667)
> at
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)
> at
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)
> at
> org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:240)
> at
> org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:494)
> ... 36 more
> 23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_50_piece0 on
> io148:39480 in memory (size: 60.4 KiB, free: 911.1 MiB)
> 23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_50_piece0 on
> io153:43928 in memory (size: 60.4 KiB, free: 910.6 MiB)
> 23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_48_piece0 on
> io148:39480 in memory (size: 253.6 KiB, free: 911.3 MiB)
> 23/07/11 16:50:57 INFO BlockManagerInfo: Removed broadcast_48_piece0 on
> io153:43928 in memory (size: 253.6 KiB, free: 910.9 MiB)
> 23/07/11 16:50:57 INFO SparkContext: Invoking stop() from shutdown hook
--
This message was sent by Atlassian Jira
(v8.20.10#820010)