QuChunhe opened a new issue, #6430:
URL: https://github.com/apache/hudi/issues/6430

   1. Java client write complex type data, such as ARRAY<ROW<c1 DOUBLE, c2 
BIGINT, c3 BIGINT>>2. 
   
   private String baseFileFormat = "parquet";
   
   Path path = new Path(tablePath);
       FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
   
       String schema = Util.getStringFromResource("/" + databasePrefix + "."
           + tableName + ".schema");
       if (!fs.exists(path)) {
         HoodieTableMetaClient.withPropertyBuilder()
             .setBaseFileFormat(baseFileFormat)
             .setPartitionFields(partitionFields)
             .setHiveStylePartitioningEnable(false)
             .setTableCreateSchema(schema)
             .setTableType(HoodieTableType.MERGE_ON_READ)
             .setRecordKeyFields(recordKeyFields)
             .setPayloadClassName(DefaultHoodieRecordPayload.class.getName())
             .setTableName(tableName)
             .initTable(hadoopConf, tablePath);
       }
   
       // Create the write client to write some records in
       HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
           .withPath(tablePath)
           .withAutoCommit(true)
           .withEmbeddedTimelineServerEnabled(false)
           .withRollbackUsingMarkers(false)
           .withBulkInsertParallelism(parallelism)
           .withSchema(schema)
           //.withSchemaEvolutionEnable(true)
           .withParallelism(parallelism, parallelism)
           .withDeleteParallelism(1)
           //.withEngineType(EngineType.SPARK)
           .forTable(tableName)
           .withMetadataConfig(
               HoodieMetadataConfig.newBuilder()
                   .enable(true)
                   .build())
           .withConsistencyGuardConfig(
               ConsistencyGuardConfig.newBuilder()
                   .withEnableOptimisticConsistencyGuard(false)
                   .build())
           .withIndexConfig(
               HoodieIndexConfig.newBuilder()
                   .withIndexType(IndexType.BLOOM)
                   .build())
           .withCompactionConfig(
               HoodieCompactionConfig.newBuilder()
                   .withAutoArchive(true)
                   .withAutoClean(true)
                   .withCompactionLazyBlockReadEnabled(true)
                   .withAsyncClean(true)
                   .build())
           .build();
   
   
   2. flink sql can not write the data, and throws the following errors
   
   java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
        at java.util.ArrayList.rangeCheck(ArrayList.java:659) ~[?:1.8.0_332]
        at java.util.ArrayList.get(ArrayList.java:435) ~[?:1.8.0_332]
        at org.apache.parquet.schema.GroupType.getType(GroupType.java:216) 
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:514)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:480)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:406)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.createWritableVectors(ParquetColumnarRowSplitReader.java:216)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:156)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:147)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getReader(MergeOnReadInputFormat.java:306)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:177)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:81)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60) 
~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
   2022-08-18 11:43:18,280 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Clearing resource requirements of job c2884af4f937a04ef4ae783dfa1e5a1c
   2022-08-18 11:43:18,282 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
cbc357ccb763df2852fee8c4fc7d55f2_0.
   2022-08-18 11:43:18,282 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 1 tasks should be restarted to recover the failed task 
cbc357ccb763df2852fee8c4fc7d55f2_0. 
   2022-08-18 11:43:18,285 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job collect 
(c2884af4f937a04ef4ae783dfa1e5a1c) switched from state RUNNING to FAILING.
   org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_332]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_332]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_332]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_332]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at akka.actor.Actor.aroundReceive(Actor.scala:517) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.12-1.13.6.jar:1.13.6]
   Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
        at java.util.ArrayList.rangeCheck(ArrayList.java:659) ~[?:1.8.0_332]
        at java.util.ArrayList.get(ArrayList.java:435) ~[?:1.8.0_332]
        at org.apache.parquet.schema.GroupType.getType(GroupType.java:216) 
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:514)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:480)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:406)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.createWritableVectors(ParquetColumnarRowSplitReader.java:216)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:156)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:147)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getReader(MergeOnReadInputFormat.java:306)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:177)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:81)
 ~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
        at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60) 
~[flink-dist_2.12-1.13.6.jar:1.13.6]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
 ~[flink-dist_2.12-1.13.6.jar:1.13.6]
   2022-08-18 11:43:18,290 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job collect 
(c2884af4f937a04ef4ae783dfa1e5a1c) switched from state FAILING to FAILED.
   
   
   
   
   The table properties as following:
   
   
   WITH
     (
     'connector' = 'hudi',
     'hoodie.datasource.write.hive_style_partitioning'='false',
     'table.type' = 'MERGE_ON_READ',
     'path' = 'oss://**',
     'hoodie.table.version'='4',
     'hoodie.table.base.file.format'='parquet',
     'read.tasks'='1',
     'write.tasks'='1',
     'write.bucket_assign.tasks'='1',
     'compaction.tasks'='1'
     );
   
   Hudi 0.11.0 and 0.11.1 version have the same errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to