That's right.
To add to it. We added multi threading to python streaming as a single
thread is sub optimal for streaming use case.
Shall we move towards a conclusion on the SDK bundle processing upper bound?

On Mon, Aug 20, 2018 at 1:54 PM Lukasz Cwik <lc...@google.com> wrote:

> Ankur, I can see where you are going with your argument. I believe there
> is certain information which is static and won't change at pipeline
> creation time (such as Python SDK is most efficient doing one bundle at a
> time) and some stuff which is best at runtime, like memory and CPU limits,
> worker count.
>
> On Mon, Aug 20, 2018 at 1:47 PM Ankur Goenka <goe...@google.com> wrote:
>
>> I would prefer to to keep it dynamic as it can be changed by the
>> infrastructure or the pipeline author.
>> Like in case of Python, number of concurrent bundle can be changed by
>> setting pipeline option worker_count. And for Java it can be computed based
>> on the cpus on the machine.
>>
>> For Flink runner, we can use the worker_count parameter for now to
>> increase the parallelism. And we can have 1 container for each mapPartition
>> task on Flink while reusing containers as container creation is expensive
>> especially for Python where it installs a bunch of dependencies. There is 1
>> caveat though. I have seen machine crashes because of too many simultaneous
>> container creation. We can rate limit container creation in the code to
>> avoid this.
>>
>> Thanks,
>> Ankur
>>
>> On Mon, Aug 20, 2018 at 9:20 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> +1 on making the resources part of a proto. Based upon what Henning
>>> linked to, the provisioning API seems like an appropriate place to provide
>>> this information.
>>>
>>> Thomas, I believe the environment proto is the best place to add
>>> information that a runner may want to know about upfront during pipeline
>>> pipeline creation. I wouldn't stick this into PipelineOptions for the long
>>> term.
>>> If you have time to capture these thoughts and update the environment
>>> proto, I would suggest going down that path. Otherwise anything short term
>>> like PipelineOptions will do.
>>>
>>> On Sun, Aug 19, 2018 at 5:41 PM Thomas Weise <t...@apache.org> wrote:
>>>
>>>> For SDKs where the upper limit is constant and known upfront, why not
>>>> communicate this along with the other harness resource info as part of the
>>>> job submission?
>>>>
>>>> Regarding use of GRPC headers: Why not make this explicit in the proto
>>>> instead?
>>>>
>>>> WRT runner dictating resource constraints: The runner actually may also
>>>> not have that information. It would need to be supplied as part of the
>>>> pipeline options? The cluster resource manager needs to allocate resources
>>>> for both, the runner and the SDK harness(es).
>>>>
>>>> Finally, what can be done to unblock the Flink runner / Python until
>>>> solution discussed here is in place? An extra runner option for SDK
>>>> singleton on/off?
>>>>
>>>>
>>>> On Sat, Aug 18, 2018 at 1:34 AM Ankur Goenka <goe...@google.com> wrote:
>>>>
>>>>> Sounds good to me.
>>>>> GRPC Header of the control channel seems to be a good place to add
>>>>> upper bound information.
>>>>> Added jiras:
>>>>> https://issues.apache.org/jira/browse/BEAM-5166
>>>>> https://issues.apache.org/jira/browse/BEAM-5167
>>>>>
>>>>> On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde <hero...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Regarding resources: the runner can currently dictate the
>>>>>> mem/cpu/disk resources that the harness is allowed to use via the
>>>>>> provisioning api. The SDK harness need not -- and should not -- speculate
>>>>>> on what else might be running on the machine:
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/beam/blob/0e14965707b5d48a3de7fa69f09d88ef0aa48c09/model/fn-execution/src/main/proto/beam_provision_api.proto#L69
>>>>>>
>>>>>> A realistic startup-time computation in the SDK harness would be
>>>>>> something simple like: max(1, min(cpu*100, mem_mb/10)) say, and use that 
>>>>>> at
>>>>>> most number of threads. Or just hardcode to 300. Or a user-provided 
>>>>>> value.
>>>>>> Whatever the value is the maximum number of bundles in flight allowed at
>>>>>> any given time and needs to be communicated to the runner via some 
>>>>>> message.
>>>>>> Anything beyond would be rejected (but this shouldn't happen, because the
>>>>>> runner should respect that number).
>>>>>>
>>>>>> A dynamic computation would use the same limits from the SDK, but
>>>>>> take into account its own resource usage (incl. the usage by running
>>>>>> bundles).
>>>>>>
>>>>>> On Fri, Aug 17, 2018 at 6:20 PM Ankur Goenka <goe...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I am thinking upper bound to be more on the lines of theocratical
>>>>>>> upper limit or any other static high value beyond which the SDK will 
>>>>>>> reject
>>>>>>> bundle verbosely. The idea is that SDK will not keep bundles in queue 
>>>>>>> while
>>>>>>> waiting on current bundles to finish. It will simply reject any 
>>>>>>> additional
>>>>>>> bundle.
>>>>>>> Beyond this I don't have a good answer to dynamic upper bound. As
>>>>>>> SDK does not have the complete picture of processes on the machine with
>>>>>>> which it share resources, resources might not be a good proxy for upper
>>>>>>> bound from the SDK point of view.
>>>>>>>
>>>>>>> On Fri, Aug 17, 2018 at 6:01 PM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Ankur, how would you expect an SDK to compute a realistic upper
>>>>>>>> bound (upfront or during pipeline computation)?
>>>>>>>>
>>>>>>>> First thought that came to my mind was that the SDK would provide
>>>>>>>> CPU/memory/... resourcing information and the runner making a judgement
>>>>>>>> call as to whether it should ask the SDK to do more work or less but 
>>>>>>>> its
>>>>>>>> not an explicit don't do more then X bundles in parallel.
>>>>>>>>
>>>>>>>> On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <goe...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Makes sense. Having exposed upper bound on concurrency with
>>>>>>>>> optimum concurrency can give a good balance. This is good information 
>>>>>>>>> to
>>>>>>>>> expose while keeping the requirements from the SDK simple. SDK can 
>>>>>>>>> publish
>>>>>>>>> 1 as the optimum concurrency and upper bound to keep things simple.
>>>>>>>>>
>>>>>>>>> Runner introspection of upper bound on concurrency is important
>>>>>>>>> for correctness while introspection of optimum concurrency is 
>>>>>>>>> important for
>>>>>>>>> efficiency. This separates efficiency and correctness requirements.
>>>>>>>>>
>>>>>>>>> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde <hero...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I agree with Luke's observation, with the caveat that "infinite
>>>>>>>>>> amount of bundles in parallel" is limited by the available 
>>>>>>>>>> resources. For
>>>>>>>>>> example, the Go SDK harness will accept an arbitrary amount of 
>>>>>>>>>> parallel
>>>>>>>>>> work, but too much work will cause either excessive GC pressure with
>>>>>>>>>> crippling slowness or an outright OOM. Unless it's always 1, a 
>>>>>>>>>> reasonable
>>>>>>>>>> upper bound will either have to be provided by the user or computed 
>>>>>>>>>> from
>>>>>>>>>> the mem/cpu resources given. Of course, as some bundles takes more
>>>>>>>>>> resources than others, so any static value will be an estimate or 
>>>>>>>>>> ignore
>>>>>>>>>> resource limits.
>>>>>>>>>>
>>>>>>>>>> That said, I do not like that an "efficiency" aspect becomes a
>>>>>>>>>> subtle requirement for correctness due to Flink internals. I fear 
>>>>>>>>>> that road
>>>>>>>>>> leads to trouble.
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka <goe...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> The later case of having a of supporting single bundle execution
>>>>>>>>>>> at a time on SDK and runner not using this flag is exactly the 
>>>>>>>>>>> reason we
>>>>>>>>>>> got into the Dead Lock here.
>>>>>>>>>>> I agree with exposing SDK optimum concurrency level ( 1 in later
>>>>>>>>>>> case ) and let runner decide to use it or not. But at the same time 
>>>>>>>>>>> expect
>>>>>>>>>>> SDK to handle infinite amount of bundles even if its not efficient.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Ankur
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik <lc...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I believe in practice SDK harnesses will fall into one of two
>>>>>>>>>>>> capabilities, can process effectively an infinite amount of 
>>>>>>>>>>>> bundles in
>>>>>>>>>>>> parallel or can only process a single bundle at a time.
>>>>>>>>>>>>
>>>>>>>>>>>> I believe it is more difficult for a runner to handle the
>>>>>>>>>>>> latter case well and to perform all the environment management 
>>>>>>>>>>>> that would
>>>>>>>>>>>> make that efficient. It may be inefficient for an SDK but I do 
>>>>>>>>>>>> believe it
>>>>>>>>>>>> should be able to say that I'm not great at anything more then a 
>>>>>>>>>>>> single
>>>>>>>>>>>> bundle at a time but utilizing this information by a runner should 
>>>>>>>>>>>> be
>>>>>>>>>>>> optional.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <goe...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> To recap the discussion it seems that we have come-up with
>>>>>>>>>>>>> following point.
>>>>>>>>>>>>> SDKHarness Management and initialization.
>>>>>>>>>>>>>
>>>>>>>>>>>>>    1. Runner completely own the work assignment to SDKHarness.
>>>>>>>>>>>>>    2. Runner should know the capabilities and capacity of
>>>>>>>>>>>>>    SDKHarness and should assign work accordingly.
>>>>>>>>>>>>>    3. Spinning up of SDKHarness is runner's responsibility
>>>>>>>>>>>>>    and it can be done statically (a fixed pre configured number 
>>>>>>>>>>>>> of SDKHarness)
>>>>>>>>>>>>>    or dynamically or based on certain other configuration/logic 
>>>>>>>>>>>>> which runner
>>>>>>>>>>>>>    choose.
>>>>>>>>>>>>>
>>>>>>>>>>>>> SDKHarness Expectation. This is more in question and we should
>>>>>>>>>>>>> outline the responsibility of SDKHarness.
>>>>>>>>>>>>>
>>>>>>>>>>>>>    1. SDKHarness should publish how many concurrent tasks it
>>>>>>>>>>>>>    can execute.
>>>>>>>>>>>>>    2. SDKHarness should start executing all the tasks items
>>>>>>>>>>>>>    assigned in parallel in a timely manner or fail task.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also to add to simplification side. I think for better
>>>>>>>>>>>>> adoption, we should have simple SDKHarness as well as simple 
>>>>>>>>>>>>> Runner
>>>>>>>>>>>>> integration to encourage integration with more runner. Also many 
>>>>>>>>>>>>> runners
>>>>>>>>>>>>> might not expose some of the internal scheduling characteristics 
>>>>>>>>>>>>> so we
>>>>>>>>>>>>> should not expect scheduling characteristics for runner 
>>>>>>>>>>>>> integration.
>>>>>>>>>>>>> Moreover scheduling characteristics can change based on pipeline 
>>>>>>>>>>>>> type,
>>>>>>>>>>>>> infrastructure, available resource etc. So I am a bit hesitant to 
>>>>>>>>>>>>> add
>>>>>>>>>>>>> runner scheduling specifics for runner integration.
>>>>>>>>>>>>> A good balance between SDKHarness complexity and Runner
>>>>>>>>>>>>> integration can be helpful in easier adoption.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Ankur
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde <
>>>>>>>>>>>>> hero...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Finding a good balance is indeed the art of portability,
>>>>>>>>>>>>>> because the range of capability (and assumptions) on both sides 
>>>>>>>>>>>>>> is wide.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It was originally the idea to allow the SDK harness to be an
>>>>>>>>>>>>>> extremely simple bundle executer (specifically, single-threaded 
>>>>>>>>>>>>>> execution
>>>>>>>>>>>>>> one instruction at a time) however inefficient -- a more 
>>>>>>>>>>>>>> sophisticated SDK
>>>>>>>>>>>>>> harness would support more features and be more efficient. For 
>>>>>>>>>>>>>> the issue
>>>>>>>>>>>>>> described here, it seems problematic to me to send 
>>>>>>>>>>>>>> non-executable bundles
>>>>>>>>>>>>>> to the SDK harness under the expectation that the SDK harness 
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>> concurrently work its way deeply enough down the instruction 
>>>>>>>>>>>>>> queue to
>>>>>>>>>>>>>> unblock itself. That would be an extremely subtle requirement 
>>>>>>>>>>>>>> for SDK
>>>>>>>>>>>>>> authors and one practical question becomes: what should an
>>>>>>>>>>>>>> SDK do with a bundle instruction that it doesn't have capacity 
>>>>>>>>>>>>>> to execute? If
>>>>>>>>>>>>>> a runner needs to make such assumptions, I think that 
>>>>>>>>>>>>>> information should
>>>>>>>>>>>>>> probably rather be explicit along the lines of proposal 1 -- 
>>>>>>>>>>>>>> i.e., some
>>>>>>>>>>>>>> kind of negotiation between resources allotted to the SDK 
>>>>>>>>>>>>>> harness (a
>>>>>>>>>>>>>> preliminary variant are in the provisioning api) and what the 
>>>>>>>>>>>>>> SDK harness
>>>>>>>>>>>>>> in return can do (and a valid answer might be: 1 bundle at a time
>>>>>>>>>>>>>> irrespectively of resources given) or a per-bundle special 
>>>>>>>>>>>>>> "overloaded"
>>>>>>>>>>>>>> error response. For other aspects, such as side input readiness, 
>>>>>>>>>>>>>> the runner
>>>>>>>>>>>>>> handles that complexity and the overall bias has generally been 
>>>>>>>>>>>>>> to move
>>>>>>>>>>>>>> complexity to the runner side.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The SDK harness and initialization overhead is entirely SDK,
>>>>>>>>>>>>>> job type and even pipeline specific. A docker container is also 
>>>>>>>>>>>>>> just a
>>>>>>>>>>>>>> process, btw, and doesn't inherently carry much overhead. That 
>>>>>>>>>>>>>> said, on a
>>>>>>>>>>>>>> single host, a static docker configuration is generally a lot 
>>>>>>>>>>>>>> simpler to
>>>>>>>>>>>>>> work with.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Henning
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <t...@apache.org>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It is good to see this discussed!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think there needs to be a good balance between the SDK
>>>>>>>>>>>>>>> harness capabilities/complexity and responsibilities. 
>>>>>>>>>>>>>>> Additionally the user
>>>>>>>>>>>>>>> will need to be able to adjust the runner behavior, since the 
>>>>>>>>>>>>>>> type of
>>>>>>>>>>>>>>> workload executed in the harness also is a factor. Elsewhere we 
>>>>>>>>>>>>>>> already
>>>>>>>>>>>>>>> discussed that the current assumption of a single SDK harness 
>>>>>>>>>>>>>>> instance per
>>>>>>>>>>>>>>> Flink task manager brings problems with it and that there needs 
>>>>>>>>>>>>>>> to be more
>>>>>>>>>>>>>>> than one way how the runner can spin up SDK harnesses.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> There was the concern that instantiation if multiple SDK
>>>>>>>>>>>>>>> harnesses per TM host is expensive (resource usage, 
>>>>>>>>>>>>>>> initialization time
>>>>>>>>>>>>>>> etc.). That may hold true for a specific scenario, such as 
>>>>>>>>>>>>>>> batch workloads
>>>>>>>>>>>>>>> and the use of Docker containers. But it may look totally 
>>>>>>>>>>>>>>> different for a
>>>>>>>>>>>>>>> streaming topology or when SDK harness is just a process on the 
>>>>>>>>>>>>>>> same host.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <
>>>>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> SDK harnesses were always responsible for executing all
>>>>>>>>>>>>>>>> work given to it concurrently. Runners have been responsible 
>>>>>>>>>>>>>>>> for choosing
>>>>>>>>>>>>>>>> how much work to give to the SDK harness in such a way that 
>>>>>>>>>>>>>>>> best utilizes
>>>>>>>>>>>>>>>> the SDK harness.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I understand that multithreading in python is inefficient
>>>>>>>>>>>>>>>> due to the global interpreter lock, it would be upto the 
>>>>>>>>>>>>>>>> runner in this
>>>>>>>>>>>>>>>> case to make sure that the amount of work it gives to each SDK 
>>>>>>>>>>>>>>>> harness best
>>>>>>>>>>>>>>>> utilizes it while spinning up an appropriate number of SDK 
>>>>>>>>>>>>>>>> harnesses.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels <
>>>>>>>>>>>>>>>> m...@apache.org> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Ankur,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for looking into this problem. The cause seems to
>>>>>>>>>>>>>>>>> be Flink's
>>>>>>>>>>>>>>>>> pipelined execution mode. It runs multiple tasks in one
>>>>>>>>>>>>>>>>> task slot and
>>>>>>>>>>>>>>>>> produces a deadlock when the pipelined operators schedule
>>>>>>>>>>>>>>>>> the SDK
>>>>>>>>>>>>>>>>> harness DoFns in non-topological order.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The problem would be resolved if we scheduled the tasks in
>>>>>>>>>>>>>>>>> topological
>>>>>>>>>>>>>>>>> order. Doing that is not easy because they run in separate
>>>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>>> operators and the SDK Harness would have to have insight
>>>>>>>>>>>>>>>>> into the
>>>>>>>>>>>>>>>>> execution graph (which is not desirable).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The easiest method, which you proposed in 1) is to ensure
>>>>>>>>>>>>>>>>> that the
>>>>>>>>>>>>>>>>> number of threads in the SDK harness matches the number of
>>>>>>>>>>>>>>>>> ExecutableStage DoFn operators.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The approach in 2) is what Flink does as well. It glues
>>>>>>>>>>>>>>>>> together
>>>>>>>>>>>>>>>>> horizontal parts of the execution graph, also in multiple
>>>>>>>>>>>>>>>>> threads. So I
>>>>>>>>>>>>>>>>> agree with your proposed solution.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Max
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote:
>>>>>>>>>>>>>>>>> > Hi,
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > tl;dr Dead Lock in task execution caused by limited task
>>>>>>>>>>>>>>>>> parallelism on
>>>>>>>>>>>>>>>>> > SDKHarness.
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > *Setup:*
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> >   * Job type: /*Beam Portable Python Batch*/ Job on
>>>>>>>>>>>>>>>>> Flink standalone
>>>>>>>>>>>>>>>>> >     cluster.
>>>>>>>>>>>>>>>>> >   * Only a single job is scheduled on the cluster.
>>>>>>>>>>>>>>>>> >   * Everything is running on a single machine with
>>>>>>>>>>>>>>>>> single Flink task
>>>>>>>>>>>>>>>>> >     manager.
>>>>>>>>>>>>>>>>> >   * Flink Task Manager Slots is 1.
>>>>>>>>>>>>>>>>> >   * Flink Parallelism is 1.
>>>>>>>>>>>>>>>>> >   * Python SDKHarness has 1 thread.
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > *Example pipeline:*
>>>>>>>>>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > *Issue:*
>>>>>>>>>>>>>>>>> > With multi stage job, Flink schedule different dependent
>>>>>>>>>>>>>>>>> sub tasks
>>>>>>>>>>>>>>>>> > concurrently on Flink worker as long as it can get
>>>>>>>>>>>>>>>>> slots. Each map tasks
>>>>>>>>>>>>>>>>> > are then executed on SDKHarness.
>>>>>>>>>>>>>>>>> > Its possible that MapB gets to SDKHarness before MapA
>>>>>>>>>>>>>>>>> and hence gets
>>>>>>>>>>>>>>>>> > into the execution queue before MapA. Because we only
>>>>>>>>>>>>>>>>> have 1 execution
>>>>>>>>>>>>>>>>> > thread on SDKHarness, MapA will never get a chance to
>>>>>>>>>>>>>>>>> execute as MapB
>>>>>>>>>>>>>>>>> > will never release the execution thread. MapB will wait
>>>>>>>>>>>>>>>>> for input from
>>>>>>>>>>>>>>>>> > MapA. This gets us to a dead lock in a simple pipeline.
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > *Mitigation:*
>>>>>>>>>>>>>>>>> > Set worker_count in pipeline options more than the
>>>>>>>>>>>>>>>>> expected sub tasks
>>>>>>>>>>>>>>>>> > in pipeline.
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > *Proposal:*
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> >  1. We can get the maximum concurrency from the runner
>>>>>>>>>>>>>>>>> and make sure
>>>>>>>>>>>>>>>>> >     that we have more threads than max concurrency. This
>>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>>> >     assumes that Beam has insight into runner execution
>>>>>>>>>>>>>>>>> plan and can
>>>>>>>>>>>>>>>>> >     make decision based on it.
>>>>>>>>>>>>>>>>> >  2. We dynamically create thread and cache them with a
>>>>>>>>>>>>>>>>> high upper bound
>>>>>>>>>>>>>>>>> >     in SDKHarness. We can warn if we are hitting the
>>>>>>>>>>>>>>>>> upper bound of
>>>>>>>>>>>>>>>>> >     threads. This approach assumes that runner does a
>>>>>>>>>>>>>>>>> good job of
>>>>>>>>>>>>>>>>> >     scheduling and will distribute tasks more or less
>>>>>>>>>>>>>>>>> evenly.
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > We expect good scheduling from runners so I prefer
>>>>>>>>>>>>>>>>> approach 2. It is
>>>>>>>>>>>>>>>>> > simpler to implement and the implementation is not
>>>>>>>>>>>>>>>>> runner specific. This
>>>>>>>>>>>>>>>>> > approach better utilize resource as it creates only as
>>>>>>>>>>>>>>>>> many threads as
>>>>>>>>>>>>>>>>> > needed instead of the peak thread requirement.
>>>>>>>>>>>>>>>>> > And last but not the least, it gives runner control over
>>>>>>>>>>>>>>>>> managing truly
>>>>>>>>>>>>>>>>> > active tasks.
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > Please let me know if I am missing something and your
>>>>>>>>>>>>>>>>> thoughts on the
>>>>>>>>>>>>>>>>> > approach.
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > Thanks,
>>>>>>>>>>>>>>>>> > Ankur
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>

Reply via email to