raghunittala opened a new issue, #9596:
URL: https://github.com/apache/hudi/issues/9596
Hi Team,
I have a Flink job where I'm trying to consume Protobuf messages from Kafka
and save them to Hudi table in S3 object storage. Here are few issues I'm
facing while trying to do so:
1. The job runs for sometime and when it tries to compact the files it is
throwing a ClassCastException. Here is the complete stacktrace for that:
```
Caused by: org.apache.hudi.exception.HoodieException: Executor executes
action [commits the instant 20230824070324708] error
... 6 more
Caused by: java.lang.ClassCastException: class
org.apache.avro.generic.GenericData$Record cannot be cast to class
org.apache.avro.specific.SpecificRecordBase
(org.apache.avro.generic.GenericData$Record and
org.apache.avro.specific.SpecificRecordBase are in unnamed module of loader
'app')
at
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:209)
at
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCompactionPlan(TimelineMetadataUtils.java:169)
at
org.apache.hudi.common.util.CompactionUtils.getCompactionPlan(CompactionUtils.java:191)
at
org.apache.hudi.common.util.CompactionUtils.lambda$getCompactionPlansByTimeline$4(CompactionUtils.java:163)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown
Source)
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown
Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
Source)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
at
org.apache.hudi.common.util.CompactionUtils.getCompactionPlansByTimeline(CompactionUtils.java:164)
at
org.apache.hudi.common.util.CompactionUtils.getAllPendingCompactionPlans(CompactionUtils.java:133)
at
org.apache.hudi.common.util.CompactionUtils.getAllPendingCompactionOperations(CompactionUtils.java:207)
at
org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:120)
at
org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:113)
at
org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:107)
at
org.apache.hudi.common.table.view.FileSystemViewManager.createInMemoryFileSystemView(FileSystemViewManager.java:177)
at
org.apache.hudi.common.table.view.FileSystemViewManager.lambda$createViewManager$5fcdabfe$1(FileSystemViewManager.java:272)
at
org.apache.hudi.common.table.view.FileSystemViewManager.lambda$getFileSystemView$1(FileSystemViewManager.java:115)
at
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(Unknown Source)
at
org.apache.hudi.common.table.view.FileSystemViewManager.getFileSystemView(FileSystemViewManager.java:114)
at org.apache.hudi.table.HoodieTable.getSliceView(HoodieTable.java:320)
at
org.apache.hudi.table.action.compact.plan.generators.BaseHoodieCompactionPlanGenerator.generateCompactionPlan(BaseHoodieCompactionPlanGenerator.java:92)
at
org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.scheduleCompaction(ScheduleCompactionActionExecutor.java:147)
at
org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.execute(ScheduleCompactionActionExecutor.java:113)
at
org.apache.hudi.table.HoodieFlinkMergeOnReadTable.scheduleCompaction(HoodieFlinkMergeOnReadTable.java:105)
at
org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableServiceInternal(BaseHoodieTableServiceClient.java:421)
at
org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableService(BaseHoodieTableServiceClient.java:393)
at
org.apache.hudi.client.BaseHoodieWriteClient.scheduleTableService(BaseHoodieWriteClient.java:1097)
at
org.apache.hudi.client.BaseHoodieWriteClient.scheduleCompactionAtInstant(BaseHoodieWriteClient.java:876)
at
org.apache.hudi.client.BaseHoodieWriteClient.scheduleCompaction(BaseHoodieWriteClient.java:867)
at
org.apache.hudi.util.CompactionUtil.scheduleCompaction(CompactionUtil.java:65)
at
org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:250)
at
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
```
I'm also copying hudi-flink1.16-bundle under flink/plugins/hudi folder.
I’m not seeing any parquet files being created in S3. I only see .log files
being created. I suspect that due to the above exception compaction couldn't
run and so parquets are not created.
Here is the hoodie.properties:
```hoodie.compaction.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload
hoodie.table.type=MERGE_ON_READ
hoodie.table.precombine.field=start_time_unix_nano
hoodie.table.partition.fields=event_id,event_type
hoodie.table.cdc.enabled=false
hoodie.archivelog.folder=archived
hoodie.timeline.layout.version=1
hoodie.table.checksum=3942898242
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.recordkey.fields=record_key
hoodie.table.name=ad_results_sink_table
hoodie.compaction.record.merger.strategy=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5
hoodie.datasource.write.hive_style_partitioning=false
hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexAvroKeyGenerator
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.version=5
```
- Hudi version: 0.13.1
- Flink version: 1.17.1
- Hadoop version: 3.3.5
- Storage: S3
- Running on Docker?: yes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]