[
https://issues.apache.org/jira/browse/SAMZA-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16827149#comment-16827149
]
Bharath Kumarasubramanian commented on SAMZA-2172:
--------------------------------------------------
Chosen solution: Make AsyncRunLoop submit messages in a pool instead of the
current single threaded dispatch
* Single threaded dispatch expects the processAsync to return immediately
which isn’t the case with StreamOperatorTask since it has non-async operators
that get executed which can potentially take time
* Doing multiple dispatches using an executor from AsyncRunLoop expects
taskInstance.process() to be thread safe. Made changes to make it thread safe
and ensured offsetManager used within process is thread thread safe.
* However, this has a side effect of potentially breaking the ordered delivery
of messages to a task in case of job.container.thread.pool.size > 1. We have a
follow up backlog item that intends to provide stronger ordering guarantees to
tasks on a per key level.
Rejected alternatives:
# Have an executor within StreamOperatorTask similar to AsyncStreamTaskAdapter
which delegates the messages in the thread pool. We need to share the executor
created in SamzaContainer in a similar fashion we do it finalizeTaskFactory
method.
** It is hacky tailored for StreamOperatorTasks introducing specific checks.
** It doesn't solve the problem of potential blocking AsyncStreamTask since
dispatch still happens on the run loop thread
# Instead of wrapping up the operator calls with
CompletableFuture.completedFuture(..) do CompletableFuture.supplyAsync(() -> {})
** Uses default java ForkJoin thread pool
** Not relatable with our existing job.container.thread.pool.size
** Introduces disparity between low level and high level offering
> Async High Level API does not schedule StreamOperatorTasks on separate threads
> ------------------------------------------------------------------------------
>
> Key: SAMZA-2172
> URL: https://issues.apache.org/jira/browse/SAMZA-2172
> Project: Samza
> Issue Type: Bug
> Reporter: Prateek Maheshwari
> Assignee: Bharath Kumarasubramanian
> Priority: Major
>
> With the Async High Level API changes, we changed StreamOperatorTask from a
> StreamTask to an AsyncStreamTask. This means that instead of using the
> AsyncStreamTaskAdapter, its processAsync() is invoked directly on the run
> loop. This means that the job.container.thread.pool.size has no effect, and
> the entire DAG (outside of asyncFlatMap) is executed on the run loop, one
> TaskInstance at a time.
> {code:java}
> ...
> at
> org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> at
> org.apache.samza.task.StreamOperatorTask.processAsync(StreamOperatorTask.java:109)
> at
> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:176)
> at
> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
> at
> at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:174)
> at
> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:470)
> at
> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:412)
> at
> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:346)
> at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:233)
> at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:763)
> at
> org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:150)
> at
> org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:78)
> at
> org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:76){code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)