[ 
https://issues.apache.org/jira/browse/FLINK-23117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17427125#comment-17427125
 ] 

Gabriel Nes commented on FLINK-23117:
-------------------------------------

It seems that we got this problem with our production environment:

* Flink 1.13.1;
* 1 job manager (without HA);
* 7 task managers with 12 slots each;

The exception stack trace goes like this:
```
org.apache.flink.util.FlinkException: TaskExecutor 
akka.tcp://[email protected]:6122/user/rpc/taskmanager_0 has no more allocated 
slots for job 2525adf1793959215a2713aeefef13e6.
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnectionIfNoAllocatedResources(TaskExecutor.java:1941)
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1922)
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1955)
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$3000(TaskExecutor.java:181)
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:2313)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```


The following screenshot shows the Task manager's current state during the 
error (loads of free slots):
!image-2021-10-11-10-23-14-069.png|width=769,height=441!

> TaskExecutor.allocateSlot is a logical error
> --------------------------------------------
>
>                 Key: FLINK-23117
>                 URL: https://issues.apache.org/jira/browse/FLINK-23117
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.12.0, 1.12.2, 1.13.0, 1.13.1
>            Reporter: zhouzhengde
>            Priority: Minor
>         Attachments: image-2021-10-11-10-11-28-390.png, 
> image-2021-10-11-10-16-17-977.png, image-2021-10-11-10-23-14-069.png
>
>
> (commit: 2020-04-22)TaskExecutor.allocateSlot at line 1109 has a logical 
> error. Use '!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, 
> allocationId)' to judge TaskSlot is used by another job that is not correct.  
> if slot index not occupy, that will be have some problem. Please confirm that 
> is correct. The issue code follow: 
> - TaskExecutor.java
> ```java
> {color:red}} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), 
> jobId, allocationId)) {{color}
>  final String message =
>  "The slot " + slotId + " has already been allocated for a different job.";
>  log.info(message);
>  final AllocationID allocationID =
>  taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
>  throw new SlotOccupiedException(
>  message, allocationID, taskSlotTable.getOwningJob(allocationID));
> }
> ```
> - TaskSlotTableImpl.java
> ```java
>     @Override
>     public boolean isAllocated(int index, JobID jobId, AllocationID 
> allocationId) {
>         TaskSlot<T> taskSlot = taskSlots.get(index);
>         if (taskSlot != null) {
>             return taskSlot.isAllocated(jobId, allocationId);
>         } else {
>             return false;
>         }
>     }
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to