[ 
https://issues.apache.org/jira/browse/SAMZA-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16827149#comment-16827149
 ] 

Bharath Kumarasubramanian commented on SAMZA-2172:
--------------------------------------------------

Chosen solution: Make AsyncRunLoop submit messages in a pool instead of the 
current single threaded dispatch
 * Single threaded dispatch expects the processAsync to return immediately 
which isn’t the case with StreamOperatorTask since it has non-async operators 
that get executed which can potentially take time
 * Doing multiple dispatches using an executor from AsyncRunLoop expects 
taskInstance.process() to be thread safe. Made changes to make it thread safe 
and ensured offsetManager used within process is thread thread safe.
 * However, this has a side effect of potentially breaking the ordered delivery 
of messages to a task in case of job.container.thread.pool.size > 1. We have a 
follow up backlog item that intends to provide stronger ordering guarantees to 
tasks on a per key level.

Rejected alternatives: 
 # Have an executor within StreamOperatorTask similar to AsyncStreamTaskAdapter 
which delegates the messages in the thread pool. We need to share the executor 
created in SamzaContainer in a similar fashion we do it finalizeTaskFactory 
method.
 ** It is hacky tailored for StreamOperatorTasks introducing specific checks.
 ** It doesn't solve the problem of potential blocking AsyncStreamTask since 
dispatch still happens on the run loop thread
 # Instead of wrapping up the operator calls with 
CompletableFuture.completedFuture(..) do CompletableFuture.supplyAsync(() -> {})
 ** Uses default java ForkJoin thread pool
 ** Not relatable with our existing job.container.thread.pool.size
 ** Introduces disparity between low level and high level offering

> Async High Level API does not schedule StreamOperatorTasks on separate threads
> ------------------------------------------------------------------------------
>
>                 Key: SAMZA-2172
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2172
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Prateek Maheshwari
>            Assignee: Bharath Kumarasubramanian
>            Priority: Major
>
> With the Async High Level API changes, we changed StreamOperatorTask from a 
> StreamTask to an AsyncStreamTask. This means that instead of using the 
> AsyncStreamTaskAdapter, its processAsync() is invoked directly on the run 
> loop. This means that the job.container.thread.pool.size has no effect, and 
> the entire DAG (outside of asyncFlatMap) is executed on the run loop, one 
> TaskInstance at a time. 
> {code:java}
> ...
> at 
> org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> at 
> org.apache.samza.task.StreamOperatorTask.processAsync(StreamOperatorTask.java:109)
> at 
> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:176)
> at 
> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
> at
> at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:174)
> at 
> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:470)
> at 
> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:412)
> at 
> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:346)
> at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:233)
> at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:763)
> at 
> org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:150)
> at 
> org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:78)
> at 
> org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:76){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to