zjq888 opened a new issue, #10639:
URL: https://github.com/apache/hudi/issues/10639

   * Hudi version :0.13.1
   
   * Flink version :1.13
   
   Hudi Flink Config:
   'connector' = 'hudi',
     'path' = 's3://bnb-datalake-hudi/**********',
     'table.type' = 'COPY_ON_WRITE', 'write.batch.size' = '512',
     'write.tasks' = '4', 'write.bucket_assign.tasks' = '4',
     'write.operation' = 'upsert', 'write.task.max.size' = '4096',
     'write.merge.max_memory' = '3072',
     'write.precombine' = 'true', 
     'precombine.field' = 'update_time',
     'hive_sync.enable' = 'true', 
      'hive_sync.db' = '---',
     'hive_sync.table' = '---',
     'hive_sync.mode' = 'GLUE', 
     'hive_sync.partition_fields' = 'date_key',
      'write.rate.limit' = '15000',
     'hive_sync.partition_extractor_class' = 
'org.apache.hudi.hive.MultiPartKeysValueExtractor'
   
   It started to run normally, but after a period of execution, the following 
error will be reported(every sync will report more than 10 partitions)
   Exception trace during upsert:
   
   @timestamp 2024-02-06T20:38:41.222Z,
      + - app_id application_1678281252174_81026,
      + - app_name bnb_dwd,
      + - container_id container_e03_1678281252174_81026_01_000004,
      + - content 
org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor [] - Error 
upsetting bucketType UPDATE for partition :20240119
     com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AbortedException:
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:61)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.internal.SdkFilterInputStream.markSupported(SdkFilterInputStream.java:125)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.makeResettable(AmazonHttpClient.java:999)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.beforeRequest(AmazonHttpClient.java:966)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:807)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5453)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5400)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:421)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6528)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1861)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1821)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.PutObjectCall.performCall(PutObjectCall.java:35)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.PutObjectCall.performCall(PutObjectCall.java:10)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.AbstractUploadingS3Call.perform(AbstractUploadingS3Call.java:87)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor$CallPerformer.call(GlobalS3Executor.java:111)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:138)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:191)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:186)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.putObject(AmazonS3LiteClient.java:107)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultSinglePartUploadDispatcher.create(DefaultSinglePartUploadDispatcher.java:39)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.uploadSingleCompleteFile(S3FSOutputStream.java:386)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.doClose(S3FSOutputStream.java:225)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:201)
 ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
 ~[hadoop-common-2.10.1-amzn-4.jar:?]
      at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108) 
~[hadoop-common-2.10.1-amzn-4.jar:?]
      at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
 ~[hadoop-common-2.10.1-amzn-4.jar:?]
      at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108) 
~[hadoop-common-2.10.1-amzn-4.jar:?]
      at 
org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.close(SizeAwareFSDataOutputStream.java:75)
 ~[hudi.jar:0.13.1]
      at 
org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
 ~[hudi.jar:0.13.1]
      at 
org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1106) 
~[hudi.jar:0.13.1]
      at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:132)
 ~[hudi.jar:0.13.1]
      at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319) 
~[hudi.jar:0.13.1]
      at 
org.apache.hudi.io.storage.HoodieAvroParquetWriter.close(HoodieAvroParquetWriter.java:84)
 ~[hudi.jar:0.13.1]
      at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:415) 
~[hudi.jar:0.13.1]
      at org.apache.hudi.io.FlinkMergeHandle.close(FlinkMergeHandle.java:172) 
~[hudi.jar:0.13.1]
      at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:168)
 ~[hudi.jar:0.13.1]
      at 
org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpdateInternal(BaseFlinkCommitActionExecutor.java:227)
 ~[hudi.jar:0.13.1]
      at 
org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpdate(BaseFlinkCommitActionExecutor.java:218)
 ~[hudi.jar:0.13.1]
      at 
org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpsertPartition(BaseFlinkCommitActionExecutor.java:189)
 ~[hudi.jar:0.13.1]
      at 
org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:107)
 ~[hudi.jar:0.13.1]
      at 
org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:69)
 ~[hudi.jar:0.13.1]
      at 
org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:77)
 ~[hudi.jar:0.13.1]
      at 
org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor.execute(FlinkUpsertCommitActionExecutor.java:51)
 ~[hudi.jar:0.13.1]
      at 
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.upsert(HoodieFlinkCopyOnWriteTable.java:111)
 ~[hudi.jar:0.13.1]
      at 
org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:147)
 ~[hudi.jar:0.13.1]
      at 
org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:189)
 ~[hudi.jar:0.13.1]
      at 
org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:472)
 ~[hudi.jar:0.13.1]
      at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) 
~[?:1.8.0_372]
      at 
org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:464)
 ~[hudi.jar:0.13.1]
      at 
org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:136)
 ~[hudi.jar:0.13.1]
      at 
org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:167)
 ~[hudi.jar:0.13.1]
      at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:60)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
 ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
 [flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) 
[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:782) 
[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372],
      + - hostname ip-10-116-20-255,
      + - level ERROR,
      + - log_name taskmanager_bnb_dwd.fact_main_c2c_order_di_hudi.log,
     + - timestamp 2024-02-06 20:38:41,222
   }
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to