QuChunhe opened a new issue, #6430:
URL: https://github.com/apache/hudi/issues/6430
1. Java client write complex type data, such as ARRAY<ROW<c1 DOUBLE, c2
BIGINT, c3 BIGINT>>2.
private String baseFileFormat = "parquet";
Path path = new Path(tablePath);
FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
String schema = Util.getStringFromResource("/" + databasePrefix + "."
+ tableName + ".schema");
if (!fs.exists(path)) {
HoodieTableMetaClient.withPropertyBuilder()
.setBaseFileFormat(baseFileFormat)
.setPartitionFields(partitionFields)
.setHiveStylePartitioningEnable(false)
.setTableCreateSchema(schema)
.setTableType(HoodieTableType.MERGE_ON_READ)
.setRecordKeyFields(recordKeyFields)
.setPayloadClassName(DefaultHoodieRecordPayload.class.getName())
.setTableName(tableName)
.initTable(hadoopConf, tablePath);
}
// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
.withPath(tablePath)
.withAutoCommit(true)
.withEmbeddedTimelineServerEnabled(false)
.withRollbackUsingMarkers(false)
.withBulkInsertParallelism(parallelism)
.withSchema(schema)
//.withSchemaEvolutionEnable(true)
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(1)
//.withEngineType(EngineType.SPARK)
.forTable(tableName)
.withMetadataConfig(
HoodieMetadataConfig.newBuilder()
.enable(true)
.build())
.withConsistencyGuardConfig(
ConsistencyGuardConfig.newBuilder()
.withEnableOptimisticConsistencyGuard(false)
.build())
.withIndexConfig(
HoodieIndexConfig.newBuilder()
.withIndexType(IndexType.BLOOM)
.build())
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withAutoArchive(true)
.withAutoClean(true)
.withCompactionLazyBlockReadEnabled(true)
.withAsyncClean(true)
.build())
.build();
2. flink sql can not write the data, and throws the following errors
java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:659) ~[?:1.8.0_332]
at java.util.ArrayList.get(ArrayList.java:435) ~[?:1.8.0_332]
at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:514)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:480)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:406)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.createWritableVectors(ParquetColumnarRowSplitReader.java:216)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:156)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:147)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getReader(MergeOnReadInputFormat.java:306)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:177)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:81)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
2022-08-18 11:43:18,280 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Clearing resource requirements of job c2884af4f937a04ef4ae783dfa1e5a1c
2022-08-18 11:43:18,282 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2022-08-18 11:43:18,282 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 1 tasks should be restarted to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2022-08-18 11:43:18,285 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect
(c2884af4f937a04ef4ae783dfa1e5a1c) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_332]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_332]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_332]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_332]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.Actor.aroundReceive(Actor.scala:517)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.12-1.13.6.jar:1.13.6]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.12-1.13.6.jar:1.13.6]
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:659) ~[?:1.8.0_332]
at java.util.ArrayList.get(ArrayList.java:435) ~[?:1.8.0_332]
at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:514)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:480)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:406)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.createWritableVectors(ParquetColumnarRowSplitReader.java:216)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:156)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:147)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getReader(MergeOnReadInputFormat.java:306)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:177)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:81)
~[hudi-flink1.13-bundle-0.12.0.jar:0.12.0]
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
2022-08-18 11:43:18,290 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect
(c2884af4f937a04ef4ae783dfa1e5a1c) switched from state FAILING to FAILED.
The table properties as following:
WITH
(
'connector' = 'hudi',
'hoodie.datasource.write.hive_style_partitioning'='false',
'table.type' = 'MERGE_ON_READ',
'path' = 'oss://**',
'hoodie.table.version'='4',
'hoodie.table.base.file.format'='parquet',
'read.tasks'='1',
'write.tasks'='1',
'write.bucket_assign.tasks'='1',
'compaction.tasks'='1'
);
Hudi 0.11.0 and 0.11.1 version have the same errors.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]