[ 
https://issues.apache.org/jira/browse/TEZ-4213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174285#comment-17174285
 ] 

Panagiotis Garefalakis edited comment on TEZ-4213 at 8/10/20, 12:41 PM:
------------------------------------------------------------------------

Hey [~ashutoshc] thanks for the comments!

{quote}can 
[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ArrayBlockingQueue.html]
 be used instead here?{quote}

The idea here is to to replace an offer() (non blocking call) with a put() call 
"that is waiting if necessary for resources to become available".
Unfortunately to do that we have to use our own modified Queue implementation.

{quote}could be int queue_size = thread_count << 2; since adding to queue would 
be much faster, having larger queue is better.{quote}
Sure, fixed.

{quote} lets use config for max thread count. For min we can use : min(4, 
thread_count) since better to start with lower thread count, in case they are 
not needed.{quote}
A limitation of the above approach (blocking in the Queue when reaching the 
limit) is that *corePoolSize* should be equal to *maxPoolSize*. 
The reason being ThreadPoolExecutor depends on failed offers to add threads 
beyond the core pool size. see https://stackoverflow.com/a/4522411

Another approach could be to use the default Queues and implement you own 
RejectedExecutionHandler -- as this approach basically blocks the calling 
(submit) thread I find it less elegant

If we think that the appContext executor should remain unbounded -- and 
inputInializers are the most error prone code part (as they populate 
directories depending on partitions)  
we could just put the inputInitialization Futures into a BlockingQueue with a 
maximum size to tackle that 
*https://github.com/apache/tez/blob/9d2b61b576a2421ec4fb813489d896d2b89fcce9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java#L127*


was (Author: pgaref):
Hey [~ashutoshc] thanks for the comments!

{quote}can 
[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ArrayBlockingQueue.html]
 be used instead here?{quote}

The idea here is to to replace an offer() (non blocking call) with a put() call 
"that is waiting if necessary for resources to become available".
Unfortunately to do that we have to use our own modified Queue implementation.

{quote}could be int queue_size = thread_count << 2; since adding to queue would 
be much faster, having larger queue is better.{quote}
Sure, fixed.

{quote} lets use config for max thread count. For min we can use : min(4, 
thread_count) since better to start with lower thread count, in case they are 
not needed.{quote}
A limitation of the above approach (blocking in the Queue when reaching the 
limit) is that *corePoolSize* should be equal to *maxPoolSize*. 
The reason being ThreadPoolExecutor depends on failed offers to add threads 
beyond the core pool size. see https://stackoverflow.com/a/4522411

Another approach could be to use the default Queues and implement you own 
RejectedExecutionHandler -- as this approach basically blocks the calling 
(submit) thread I find it less elegant

If we think that the appContext executor should still be unbounded -- and 
inputInializers are the most prone code part (as they populate directories 
depending on partitions)  
we could just put the inputInitialization Futures into a BlockingQueue with a 
maximum size to tackle that 
*https://github.com/apache/tez/blob/9d2b61b576a2421ec4fb813489d896d2b89fcce9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java#L127*

> Bound appContext executor capacity using a configurable property
> ----------------------------------------------------------------
>
>                 Key: TEZ-4213
>                 URL: https://issues.apache.org/jira/browse/TEZ-4213
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: Panagiotis Garefalakis
>            Assignee: Panagiotis Garefalakis
>            Priority: Major
>         Attachments: TEZ-4213.01.patch, TEZ-4213.02.patch, TEZ-4213.03.patch, 
> TEZ-4213.04.patch, TEZ-4213.05.patch, TEZ-4213.06.patch
>
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> After TEZ-4170 was merged, appContext executor pool is also used by the 
> RootInputInitializerManager to speed up SplitGeneration.
> However, this executor pool currently has not capacity limit 
> https://github.com/apache/tez/blob/master/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java#L624
> The problem the occurs when generating splits for larger inputs (thousands or 
> more) is that it can could result to
> {color:red}java.lang.OutOfMemoryError{color}
> that is also reproducible with a test case.
> https://github.com/apache/tez/blob/master/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java#L130
> To avoid such errors, I propose to limit the capacity of this pool to a 
> configurable value that can be for example the number of physical cores by 
> default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to