QuChunhe opened a new issue, #6565:
URL: https://github.com/apache/hudi/issues/6565

   1. Hudi version: 0.12.0, aliyun oss file system,flink 1.13.6
   2. Hudi java client enable async clean, config as following. 
   
      if (!fs.exists(path)) {
         HoodieTableMetaClient.withPropertyBuilder()
             .setBaseFileFormat(baseFileFormat)
             .setPartitionFields(partitionFields)
             //.setPreCombineField(preCombineField)
             .setHiveStylePartitioningEnable(false)
             .setTableCreateSchema(schema)
             .setTableType(HoodieTableType.MERGE_ON_READ)
             .setRecordKeyFields(recordKeyFields)
             .setPayloadClassName(DefaultHoodieRecordPayload.class.getName())
             .setTableName(tableName)
             .initTable(hadoopConf, tablePath);
       }
   
       // Create the write client to write some records in
       HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
           .withPath(tablePath)
           .withAutoCommit(true)
           .withEmbeddedTimelineServerEnabled(false)
           .withRollbackUsingMarkers(false)
           .withBulkInsertParallelism(parallelism)
           .withSchema(schema)
           .withSchemaEvolutionEnable(enableSchemaEvolution)
           .withParallelism(parallelism, parallelism)
           .withDeleteParallelism(1)
           //.withEngineType(EngineType.SPARK)
           .forTable(tableName)
           .withMergeAllowDuplicateOnInserts(false)
           .withCleanConfig(HoodieCleanConfig.newBuilder()
               .withAutoClean(true)
               .withAsyncClean(true)
               .build())
           .withStorageConfig(
               HoodieStorageConfig.newBuilder()
                   .parquetWriteLegacyFormat("false")
                   .build())
           .withMetadataConfig(
               HoodieMetadataConfig.newBuilder()
                   .withAsyncClean(true)
                   .withAsyncIndex(true)
                   .enable(true)
                   .build())
           .withConsistencyGuardConfig(
               ConsistencyGuardConfig.newBuilder()
                   .withEnableOptimisticConsistencyGuard(false)
                   .build())
           .withIndexConfig(
               HoodieIndexConfig.newBuilder()
                   .withIndexType(IndexType.BLOOM)
                   .build())
           .withCompactionConfig(
               HoodieCompactionConfig.newBuilder()
                   .withCompactionLazyBlockReadEnabled(true)
                   .build())
           .withMarkersType(MarkerType.DIRECT.name())
           .build();
       client = new HoodieJavaWriteClient<>(new 
HoodieJavaEngineContext(hadoopConf), cfg);
   
   
   3. When Java client writes data, errors are thrown continuously.
   shadow.gs.org.apache.hudi.exception.HoodieException: Error waiting for async 
clean service to finish
           at 
shadow.gs.org.apache.hudi.async.AsyncCleanerService.waitForCompletion(AsyncCleanerService.java:77)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:613)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.client.BaseHoodieWriteClient.postCommit(BaseHoodieWriteClient.java:537)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.client.HoodieJavaWriteClient.postWrite(HoodieJavaWriteClient.java:200)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.client.HoodieJavaWriteClient.insert(HoodieJavaWriteClient.java:130)
 ~[robot-stream.jar:?]
           at com.robot.gs.sink.HudiSink.invoke(HudiSink.java:111) 
~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
 ~[robot-stream.jar:?]
           at 
com.robot.gs.process.BatchingDataInWindow.process(BatchingDataInWindow.java:18) 
~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction.process(InternalIterableProcessAllWindowFunction.java:64)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction.process(InternalIterableProcessAllWindowFunction.java:33)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:577)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:434)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
 [robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) 
[robot-stream.jar:?]
           at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
[robot-stream.jar:?]
           at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
[robot-stream.jar:?]
           at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
   Caused by: java.lang.InterruptedException
           at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347) 
~[?:1.8.0_332]
           at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) 
~[?:1.8.0_332]
           at 
shadow.gs.org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.async.AsyncCleanerService.waitForCompletion(AsyncCleanerService.java:75)
 ~[robot-stream.jar:?]
           ... 30 more
   
   4. Disable AsyncClea, the errors disappear.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to