hj2016 opened a new pull request #2814: URL: https://github.com/apache/hudi/pull/2814
## *Tips* - *Thank you very much for contributing to Apache Hudi.* - *Please review https://hudi.apache.org/contributing.html before opening a pull request.* ## What is the purpose of the pull request fix flink-client query error when processing files larger than 128mb Use the flink client to query the cow table and report an error. The error message is as follows: `Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: org.apache.hadoop.fs.HdfsBlockLocation cannot be cast to java.lang.Comparable at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:260) at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:866) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:257) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:322) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:276) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478) ... 4 more Caused by: java.lang.ClassCastException: org.apache.hadoop.fs.HdfsBlockLocation cannot be cast to java.lang.Comparable at java.util.ComparableTimSort.countRunAndMakeAscending(ComparableTimSort.java:320) at java.util.ComparableTimSort.sort(ComparableTimSort.java:188) at java.util.Arrays.sort(Arrays.java:1246) at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:212) at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:64) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:247) ... 18 more` The reason is that the implementation object of FileSystem is hadoop, and the BlockLocation implementation class of the getFileBlockLocations method to obtain the block does not implement the compare method. Here, the Arrays.sort method is used for sorting. When the number of blocks is greater than 1, the comparison will report an error. I think the CopyOnWriteInputFormat class imitates FileInputFormat for block fragment acquisition and sorting. The FileInputFormat class implements HadoopBlockLocation to obtain fragmentation information for sorting, and HadoopBlockLocation implements the compareTo method. Here modify the implementation of the incoming inner class to do sorting. ## Committer checklist - [ ] Has a corresponding JIRA in PR title & commit - [ ] Commit message is descriptive of the change - [ ] CI is green - [ ] Necessary doc changes done or have another open PR - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
