raghunittala opened a new issue, #9596:
URL: https://github.com/apache/hudi/issues/9596

   Hi Team,
   
   I have a Flink job where I'm trying to consume Protobuf messages from Kafka 
and save them to Hudi table in S3 object storage. Here are few issues I'm 
facing while trying to do so:
   
   1. The job runs for sometime and when it tries to compact the files it is 
throwing a ClassCastException. Here is the complete stacktrace for that:
   ```
   Caused by: org.apache.hudi.exception.HoodieException: Executor executes 
action [commits the instant 20230824070324708] error
       ... 6 more
   Caused by: java.lang.ClassCastException: class 
org.apache.avro.generic.GenericData$Record cannot be cast to class 
org.apache.avro.specific.SpecificRecordBase 
(org.apache.avro.generic.GenericData$Record and 
org.apache.avro.specific.SpecificRecordBase are in unnamed module of loader 
'app')
       at 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:209)
       at 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCompactionPlan(TimelineMetadataUtils.java:169)
       at 
org.apache.hudi.common.util.CompactionUtils.getCompactionPlan(CompactionUtils.java:191)
       at 
org.apache.hudi.common.util.CompactionUtils.lambda$getCompactionPlansByTimeline$4(CompactionUtils.java:163)
       at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown 
Source)
       at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown 
Source)
       at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
       at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown 
Source)
       at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source)
       at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
       at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
       at 
org.apache.hudi.common.util.CompactionUtils.getCompactionPlansByTimeline(CompactionUtils.java:164)
       at 
org.apache.hudi.common.util.CompactionUtils.getAllPendingCompactionPlans(CompactionUtils.java:133)
       at 
org.apache.hudi.common.util.CompactionUtils.getAllPendingCompactionOperations(CompactionUtils.java:207)
       at 
org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:120)
       at 
org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:113)
       at 
org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:107)
       at 
org.apache.hudi.common.table.view.FileSystemViewManager.createInMemoryFileSystemView(FileSystemViewManager.java:177)
       at 
org.apache.hudi.common.table.view.FileSystemViewManager.lambda$createViewManager$5fcdabfe$1(FileSystemViewManager.java:272)
       at 
org.apache.hudi.common.table.view.FileSystemViewManager.lambda$getFileSystemView$1(FileSystemViewManager.java:115)
       at 
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(Unknown Source)
       at 
org.apache.hudi.common.table.view.FileSystemViewManager.getFileSystemView(FileSystemViewManager.java:114)
       at org.apache.hudi.table.HoodieTable.getSliceView(HoodieTable.java:320)
       at 
org.apache.hudi.table.action.compact.plan.generators.BaseHoodieCompactionPlanGenerator.generateCompactionPlan(BaseHoodieCompactionPlanGenerator.java:92)
       at 
org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.scheduleCompaction(ScheduleCompactionActionExecutor.java:147)
       at 
org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.execute(ScheduleCompactionActionExecutor.java:113)
       at 
org.apache.hudi.table.HoodieFlinkMergeOnReadTable.scheduleCompaction(HoodieFlinkMergeOnReadTable.java:105)
       at 
org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableServiceInternal(BaseHoodieTableServiceClient.java:421)
       at 
org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableService(BaseHoodieTableServiceClient.java:393)
       at 
org.apache.hudi.client.BaseHoodieWriteClient.scheduleTableService(BaseHoodieWriteClient.java:1097)
       at 
org.apache.hudi.client.BaseHoodieWriteClient.scheduleCompactionAtInstant(BaseHoodieWriteClient.java:876)
       at 
org.apache.hudi.client.BaseHoodieWriteClient.scheduleCompaction(BaseHoodieWriteClient.java:867)
       at 
org.apache.hudi.util.CompactionUtil.scheduleCompaction(CompactionUtil.java:65)
       at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:250)
       at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
   ```
   I'm also copying hudi-flink1.16-bundle under flink/plugins/hudi folder.
   
   I’m not seeing any parquet files being created in S3. I only see .log files 
being created. I suspect that due to the above exception compaction couldn't 
run and so parquets are not created.
   
   Here is the hoodie.properties:
   
```hoodie.compaction.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload
   hoodie.table.type=MERGE_ON_READ
   hoodie.table.precombine.field=start_time_unix_nano
   hoodie.table.partition.fields=event_id,event_type
   hoodie.table.cdc.enabled=false
   hoodie.archivelog.folder=archived
   hoodie.timeline.layout.version=1
   hoodie.table.checksum=3942898242
   hoodie.datasource.write.drop.partition.columns=false
   hoodie.table.recordkey.fields=record_key
   hoodie.table.name=ad_results_sink_table
   hoodie.compaction.record.merger.strategy=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5
   hoodie.datasource.write.hive_style_partitioning=false
   
hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexAvroKeyGenerator
   hoodie.datasource.write.partitionpath.urlencode=false
   hoodie.table.version=5
   ```
   
   - Hudi version: 0.13.1
   - Flink version: 1.17.1
   - Hadoop version: 3.3.5
   - Storage: S3
   - Running on Docker?: yes
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to