[ 
https://issues.apache.org/jira/browse/FLINK-31746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17710214#comment-17710214
 ] 

Haoze Wu commented on FLINK-31746:
----------------------------------

[~martijnvisser] I reproduced the similar issue in Flink-1.17.0. It has the 
similar stage of "Sink Committer (1/1)#0" and the exception can cause this 
issue.

> Batch workload output completes while the job client fails
> ----------------------------------------------------------
>
>                 Key: FLINK-31746
>                 URL: https://issues.apache.org/jira/browse/FLINK-31746
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.14.0
>            Reporter: Haoze Wu
>            Priority: Major
>
> We are doing testing on Flink-1.14.0 (We know 1.14.0 is not supported now so 
> we are also testing Flink-1.17.0 to see if it has the same issue). We run a 
> batch processing job. The input of the job is a file in the disk; the output 
> of the job is a Kafka topic, which should receive 170 messages when the 
> workload finishes. In the testing, we introduce a fault (an IOException) in a 
> taskmanager, then the batch processing job client fails:
> {code:java}
> 2023-03-26T19:05:48,922 ERROR cli.CliFrontend 
> (CliFrontend.java:handleError(923)) - Error while running the 
> command.org.apache.flink.client.program.ProgramInvocationException: The main 
> method caused an error: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: 85c9bd56d6dd111f858b4b5a99551c53) {code}
> The IOException occurs in 
> `BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader` when 
> running `FileChannel.open`. It has multiple chances to occur in a workload.
> {code:java}
>     FileRegionReader(Path filePath) throws IOException {
>         this.fileChannel = FileChannel.open(filePath, 
> StandardOpenOption.READ);
>         this.headerBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer();
>     }
>  {code}
> The call stack of this fault site:
> {code:java}
> (org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader,<init>,200),
>  
> (org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader,<init>,74),
>  
> (org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition,createReadView,221),
>  
> (org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition,createSubpartitionView,205),
>  
> (org.apache.flink.runtime.io.network.partition.ResultPartitionManager,createSubpartitionView,76),
>  
> (org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel,requestSubpartition,133),
>  
> (org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate,internalRequestPartitions,330),
>  
> (org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate,requestPartitions,299),
>  
> (org.apache.flink.runtime.taskmanager.InputGateWithMetrics,requestPartitions,127),
>  
> (org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1,runThrowing,50),
> (org.apache.flink.streaming.runtime.tasks.mailbox.Mail,run,90), 
> (org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,processMailsNonBlocking,353),
>  
> (org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,processMail,319),
>  
> (org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,runMailboxLoop,201),
>  (org.apache.flink.streaming.runtime.tasks.StreamTask,runMailboxLoop,809),
> (org.apache.flink.streaming.runtime.tasks.StreamTask,invoke,761),
> (org.apache.flink.runtime.taskmanager.Task,runWithSystemExitMonitoring,958),
> (org.apache.flink.runtime.taskmanager.Task,restoreAndInvoke,937),
> (org.apache.flink.runtime.taskmanager.Task,doRun,766),
> (org.apache.flink.runtime.taskmanager.Task,run,575),
> (java.lang.Thread,run,748) {code}
> We inspect the name of the threads where the fault occurs, we find that our 
> workload can be divided into these tasks:
> Split Reader: Custom File Source -> Flat Map (1/8)#0
> ...
> Split Reader: Custom File Source -> Flat Map (8/8)#0
> Keyed Aggregation -> Map -> Sink Unnamed Writer (1/8)#0
> ...
> Keyed Aggregation -> Map -> Sink Unnamed Writer (8/8)#0
> Sink Unnamed Committer (1/1)#0
>  
> Such fault during “Split Reader” or “Keyed Aggregation” will trigger this 
> “Job failed” message and our Kafka topic can’t receive the complete correct 
> output (i.e., less than 170 messages). However, if the exception happens 
> during “Sink Unnamed Committer”, the client still recognizes the “Job 
> failed”, while our Kafka topic already completely got what it wants.
> We assume that our workload is translated into a few steps: “Custom File 
> Source -> Flat Map”, “Keyed Aggregation -> Map -> Sink Unnamed Writer”, and 
> “Sink Unnamed Committer”. The last one is responsible for some “commit” for 
> it does not affect our end-to-end results. However, the fault in the “commit” 
> stage still reports a “failure” to the job client, while the job client may 
> get confused.
> We have some questions about the design rationales:
>  # In some workloads such as our case, the “commit” at last seems not to 
> matter that much. Can it be seen as tolerable?
>  # The client log is confusing. It shows tons of exceptions but it does not 
> show in which stage of the workload the failure happens. The most useful 
> information for the client is something like “Sink Unnamed Committer (1/1)#0 
> (7b19f0a2f247b8f38fe9141c9872ef58) switched from RUNNING to FAILED”, which is 
> not shown.
> P.S. The complete failure log of the job client is:
> {code:java}
> 2023-04-03T11:36:25,464 ERROR cli.CliFrontend 
> (CliFrontend.java:handleError(923)) - Error while running the comm
> and.
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: org.apache.flink.cl
> ient.program.ProgramInvocationException: Job failed (JobID: 
> 8a169709de74948b5a9fed7d52c13f8d)
>         at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>  ~[flink-dist
> _2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.jav
> a:222) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
> ~[flink-dist_2.11-1.14.0.jar
> :1.14.0]
>         at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) 
> ~[flink-dist_2.11-1.14.0
> .jar:1.14.0]
>         at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0
> ]
>         at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) 
> ~[flink-dist_2.11-1.14.0.j
> ar:1.14.0]
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) 
> ~[flink-dist_2.11-1.14.
> 0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28
> ) [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) 
> [flink-dist_2.11-1.14.0.jar:1.14.
> 0]
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException:
> Job failed (JobID: 8a169709de74948b5a9fed7d52c13f8d)
>         at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
> ~[?:1.8.0_221]
>         at 
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironme
> nt.java:123) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
>  ~[
> flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironm
> ent.java:1917) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> edu.jhu.order.mcgray.fl_1_14_0.FlinkGrayBatchClientMain.run(FlinkGrayBatchClientMain.java:69)
>  ~[?:?]
>         at 
> edu.jhu.order.mcgray.fl_1_14_0.FlinkGrayClientMain.run(FlinkGrayClientMain.java:66)
>  ~[?:?]
>         at 
> edu.jhu.order.mcgray.fl_1_14_0.FlinkGrayClientMain.main(FlinkGrayClientMain.java:92)
>  ~[?:?]
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:1.8.0_221]
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_221]
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_221]
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
>         at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         ... 8 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed (JobID: 8a169709de74948b5a9fed7d52c13f8d)
>         at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) 
> ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_221]
>         at 
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_221]
>         at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:698)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_221]
>         at 
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) 
> ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[?:1.8.0_221]
>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>         at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) 
> ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_221]
>         at 
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_221]
>         at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:698)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_221]
>         at 
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) 
> ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[?:1.8.0_221]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[?:1.8.0_221]
>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>         at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) ~[?:?]
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_221]
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
>         at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
>  ~[?:?]
>         at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>  ~[?:?]
>         at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
>  ~[?:?]
>         at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
>  ~[?:?]
>         at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>  ~[?:?]
>         at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>  ~[?:?]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
> ~[?:?]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
> ~[?:?]
>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]
>         at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?]
>         at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?]
>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
> ~[?:?]
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[?:?]
>         at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?]
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?]
>         at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?]
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?]
>         at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
> ~[?:1.8.0_221]
>         at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
> ~[?:1.8.0_221]
>         at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
> ~[?:1.8.0_221]
>         at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 
> ~[?:1.8.0_221]
> Caused by: java.io.IOException
>         at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader.<init>(BoundedBlockingSubpartitionDirectTransferReader.java:229)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader.<init>(BoundedBlockingSubpartitionDirectTransferReader.java:82)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:226)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.createSubpartitionView(BufferWritingResultPartition.java:209)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:76)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:133)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:330)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:299)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:127)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:358)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:322)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221] {code}
> We feel that probably the job client should improve its logging, by adding 
> more details about the failure, such as the information about “Sink Unnamed 
> Committer”.
> We are also checking Flink-1.17.0 to see if it has this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to