[ 
https://issues.apache.org/jira/browse/FLINK-32133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-32133:
-------------------------------
    Description: 
h1. Problem

When the initial job requests many containers from yarn, it is easy to get more 
than needed containers for that the YARN AM-RM protocol is not a delta protocol 
(please see YARN-1902). For example, we are needing 3000 containers. Consider 
the following case. 

*Case one:*
 # The job requests 2000 containers firstly and then the yarn client has 2000 
requests.
 # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 2000 
containers+ to yarn rm.{color}
 # The job requests another 1000 containers and the the yarn client has 3000 
requests.
 # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 3000 
containers+ to yarn rm.{color}
 # On heartbeat finish, yarn rm {+}returns 2000 containers{+}. After the 
callback the method onContainersAllocated and removeContainerRequest, yarn 
client has 1000 requests.
 # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 1000 
containers+ to yarn rm. {color}
 # On heartbeat finish, yarn rm {+}returns 3000 containers{+}. After the 
callback the method onContainersAllocated and removeContainerRequest, yarn 
client has 0 requests.
 # {color:#FF0000}The yarn heartbeat happens.{color}
 # On heartbeat finish, yarn rm +returns 1000 containers+{color:#FF0000} 
{color:#172b4d}which are excess since the last client request number is 
1000.{color}{color}

{color:#172b4d}In the end, the yarn may allocate 2000 + 3000 + 1000 = 6000 
containers. But we only need 3000 containers and should return 3000 
containers.{color}

*{color:#172b4d}Case two:{color}*
 # {color:#172b4d}The job requests 3000 containers firstly and the the yarn 
client has 3000 requests.{color}
 # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 3000 
containers+ to yarn rm.{color}
 # On heartbeat finish, yarn rm {+}returns 1000 containers({+}2000 
allocating{+}){+}. After the callback the method onContainersAllocated and 
removeContainerRequest, yarn client has 2000 requests.
 # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 2000 
containers+ to yarn rm.{color}
 # On heartbeat finish, yarn rm {+}returns 2000 containers{+}. After the 
callback the method onContainersAllocated and removeContainerRequest, yarn 
client has 0 requests.
 # {color:#FF0000}The yarn heartbeat happens.{color}
 # On heartbeat finish, yarn rm +returns 2000 containers+{color:#FF0000} 
{color:#172b4d}which are excess since the last client request number is 
2000.{color}{color}

{color:#172b4d}In the end, the yarn may allocate 1000 + 2000 + 2000 = 5000 
containers. But we only need 3000 containers and should return 2000 
containers.{color}

{color:#172b4d}The reason is that any update to the yarn client's requests may 
produce undesired behavior. {color}
h1. {color:#172b4d}Solution{color}

{color:#172b4d}In our inner flink version, we use two ways to resolve the 
problem as following:{color}
 # {color:#172b4d}{color:#172b4d}Compute the total resource requests at start 
and request by batch{color}{color}{color:#172b4d} to avoid being interrupted by 
yarn heartbeat. Note that we {color}{color:#172b4d}{color:#172b4d}loop 
{color}{color}resourceManagerClient.addContainerRequest(containerRequest){color:#172b4d})
 to simulate batch-request quickly.{color}
 # {color:#172b4d}Remove the yarn client's container requests after receiving 
enough resources to avoid request update.{color}

  was:
h1. Problem

When the initial job requests many containers from yarn, it is easy to get more 
than needed containers for that the YARN AM-RM protocol is not a delta protocol 
(please see YARN-1902). For example, we are needing 3000 containers. Consider 
the following case. 

*Case one:*
 # The job requests 2000 containers firstly and the the yarn client has 2000 
requests.
 # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 2000 
containers+ to yarn rm.{color}
 # The job requests another 1000 containers and the the yarn client has 3000 
requests.
 # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 3000 
containers+ to yarn rm.{color}
 # On heartbeat finish, yarn rm {+}returns 2000 containers{+}. After the 
callback the method onContainersAllocated and removeContainerRequest, yarn 
client has 1000 requests.
 # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 1000 
containers+ to yarn rm. {color}
 # On heartbeat finish, yarn rm {+}returns 3000 containers{+}. After the 
callback the method onContainersAllocated and removeContainerRequest, yarn 
client has 0 requests.
 # {color:#FF0000}The yarn heartbeat happens.{color}
 # On heartbeat finish, yarn rm +returns 1000 containers+{color:#FF0000} 
{color:#172b4d}which are excess since the last client request number is 
1000.{color}{color}

{color:#172b4d}In the end, the yarn may allocate 2000 + 3000 + 1000 = 6000 
containers. But we only need 3000 containers and should return 3000 
containers.{color}

*{color:#172b4d}Case two:{color}*
 # {color:#172b4d}The job requests 3000 containers firstly and the the yarn 
client has 3000 requests.{color}
 # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 3000 
containers+ to yarn rm.{color}
 # On heartbeat finish, yarn rm {+}returns 1000 containers({+}2000 
allocating{+}){+}. After the callback the method onContainersAllocated and 
removeContainerRequest, yarn client has 2000 requests.
 # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 2000 
containers+ to yarn rm.{color}
 # On heartbeat finish, yarn rm {+}returns 2000 containers{+}. After the 
callback the method onContainersAllocated and removeContainerRequest, yarn 
client has 0 requests.
 # {color:#FF0000}The yarn heartbeat happens.{color}
 # On heartbeat finish, yarn rm +returns 2000 containers+{color:#FF0000} 
{color:#172b4d}which are excess since the last client request number is 
2000.{color}{color}

{color:#172b4d}In the end, the yarn may allocate 1000 + 2000 + 2000 = 5000 
containers. But we only need 3000 containers and should return 2000 
containers.{color}

{color:#172b4d}The reason is that any update to the yarn client's requests may 
produce undesired behavior. {color}
h1. {color:#172b4d}Solution{color}

{color:#172b4d}In our inner flink version, we use two ways to resolve the 
problem as following:{color}
 # {color:#172b4d}{color:#172b4d}Compute the total resource requests at start 
and request by batch{color}{color}{color:#172b4d} to avoid being interrupted by 
yarn heartbeat. Note that we {color}{color:#172b4d}{color:#172b4d}loop 
{color}{color}resourceManagerClient.addContainerRequest(containerRequest){color:#172b4d})
 to simulate batch-request quickly.{color}
 # {color:#172b4d}Remove the yarn client's container requests after receiving 
enough resources to avoid request update.{color}


> Batch requests and remove requests in the end to reduce YarnResourceManager's 
> excess containers
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32133
>                 URL: https://issues.apache.org/jira/browse/FLINK-32133
>             Project: Flink
>          Issue Type: Improvement
>          Components: Deployment / YARN
>    Affects Versions: 1.18.0
>            Reporter: Liu
>            Priority: Major
>
> h1. Problem
> When the initial job requests many containers from yarn, it is easy to get 
> more than needed containers for that the YARN AM-RM protocol is not a delta 
> protocol (please see YARN-1902). For example, we are needing 3000 containers. 
> Consider the following case. 
> *Case one:*
>  # The job requests 2000 containers firstly and then the yarn client has 2000 
> requests.
>  # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 
> 2000 containers+ to yarn rm.{color}
>  # The job requests another 1000 containers and the the yarn client has 3000 
> requests.
>  # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 
> 3000 containers+ to yarn rm.{color}
>  # On heartbeat finish, yarn rm {+}returns 2000 containers{+}. After the 
> callback the method onContainersAllocated and removeContainerRequest, yarn 
> client has 1000 requests.
>  # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 
> 1000 containers+ to yarn rm. {color}
>  # On heartbeat finish, yarn rm {+}returns 3000 containers{+}. After the 
> callback the method onContainersAllocated and removeContainerRequest, yarn 
> client has 0 requests.
>  # {color:#FF0000}The yarn heartbeat happens.{color}
>  # On heartbeat finish, yarn rm +returns 1000 containers+{color:#FF0000} 
> {color:#172b4d}which are excess since the last client request number is 
> 1000.{color}{color}
> {color:#172b4d}In the end, the yarn may allocate 2000 + 3000 + 1000 = 6000 
> containers. But we only need 3000 containers and should return 3000 
> containers.{color}
> *{color:#172b4d}Case two:{color}*
>  # {color:#172b4d}The job requests 3000 containers firstly and the the yarn 
> client has 3000 requests.{color}
>  # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 
> 3000 containers+ to yarn rm.{color}
>  # On heartbeat finish, yarn rm {+}returns 1000 containers({+}2000 
> allocating{+}){+}. After the callback the method onContainersAllocated and 
> removeContainerRequest, yarn client has 2000 requests.
>  # {color:#FF0000}The yarn heartbeat happens and the yarn client +request 
> 2000 containers+ to yarn rm.{color}
>  # On heartbeat finish, yarn rm {+}returns 2000 containers{+}. After the 
> callback the method onContainersAllocated and removeContainerRequest, yarn 
> client has 0 requests.
>  # {color:#FF0000}The yarn heartbeat happens.{color}
>  # On heartbeat finish, yarn rm +returns 2000 containers+{color:#FF0000} 
> {color:#172b4d}which are excess since the last client request number is 
> 2000.{color}{color}
> {color:#172b4d}In the end, the yarn may allocate 1000 + 2000 + 2000 = 5000 
> containers. But we only need 3000 containers and should return 2000 
> containers.{color}
> {color:#172b4d}The reason is that any update to the yarn client's requests 
> may produce undesired behavior. {color}
> h1. {color:#172b4d}Solution{color}
> {color:#172b4d}In our inner flink version, we use two ways to resolve the 
> problem as following:{color}
>  # {color:#172b4d}{color:#172b4d}Compute the total resource requests at start 
> and request by batch{color}{color}{color:#172b4d} to avoid being interrupted 
> by yarn heartbeat. Note that we {color}{color:#172b4d}{color:#172b4d}loop 
> {color}{color}resourceManagerClient.addContainerRequest(containerRequest){color:#172b4d})
>  to simulate batch-request quickly.{color}
>  # {color:#172b4d}Remove the yarn client's container requests after receiving 
> enough resources to avoid request update.{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to