I managed to write a small document based on the discussion.
Please take a look at
https://docs.google.com/document/d/1oAXVPbJ0dzj2_8LXEWFAgqCP5Tpld3q5B3QU254PQ6A/edit?usp=sharing


On Tue, Aug 21, 2018 at 11:01 PM Henning Rohde <[email protected]> wrote:

> Sending bundles that cannot be executed, i.e., the situation described to
> cause deadlock in Flink in the beginning of the thread with mapB. The
> discussion of exposing (or assuming an infinitely large) concurrency level
> -- while a useful concept in its own right -- came around as a way to
> unblock mapB.
>
> On Tue, Aug 21, 2018 at 2:16 PM Lukasz Cwik <[email protected]> wrote:
>
>> Henning, can you clarify by what you mean with send non-executable
>> bundles to the SDK harness and how it is useful for Flink?
>>
>> On Tue, Aug 21, 2018 at 2:01 PM Henning Rohde <[email protected]> wrote:
>>
>>> I think it will be useful to the runner to know upfront what the
>>> fundamental threading capabilities are for the SDK harness (say, "fixed",
>>> "linear", "dynamic", ..) so that the runner can upfront make a good static
>>> decision on #harnesses and how many resources they should each have. It's
>>> wasteful to give the Foo SDK a whole many-core machine with TBs of memory,
>>> if it can only support a single bundle at a time. I think this is also in
>>> line with what Thomas and Luke are suggesting.
>>>
>>> However, it still seems to me to be a semantically problematic idea to
>>> send non-executable bundles to the SDK harness. I understand it's useful
>>> for Flink, but is that really the best path forward?
>>>
>>>
>>>
>>> On Mon, Aug 20, 2018 at 5:44 PM Ankur Goenka <[email protected]> wrote:
>>>
>>>> That's right.
>>>> To add to it. We added multi threading to python streaming as a single
>>>> thread is sub optimal for streaming use case.
>>>> Shall we move towards a conclusion on the SDK bundle processing upper
>>>> bound?
>>>>
>>>> On Mon, Aug 20, 2018 at 1:54 PM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> Ankur, I can see where you are going with your argument. I believe
>>>>> there is certain information which is static and won't change at pipeline
>>>>> creation time (such as Python SDK is most efficient doing one bundle at a
>>>>> time) and some stuff which is best at runtime, like memory and CPU limits,
>>>>> worker count.
>>>>>
>>>>> On Mon, Aug 20, 2018 at 1:47 PM Ankur Goenka <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I would prefer to to keep it dynamic as it can be changed by the
>>>>>> infrastructure or the pipeline author.
>>>>>> Like in case of Python, number of concurrent bundle can be changed by
>>>>>> setting pipeline option worker_count. And for Java it can be computed 
>>>>>> based
>>>>>> on the cpus on the machine.
>>>>>>
>>>>>> For Flink runner, we can use the worker_count parameter for now to
>>>>>> increase the parallelism. And we can have 1 container for each 
>>>>>> mapPartition
>>>>>> task on Flink while reusing containers as container creation is expensive
>>>>>> especially for Python where it installs a bunch of dependencies. There 
>>>>>> is 1
>>>>>> caveat though. I have seen machine crashes because of too many 
>>>>>> simultaneous
>>>>>> container creation. We can rate limit container creation in the code to
>>>>>> avoid this.
>>>>>>
>>>>>> Thanks,
>>>>>> Ankur
>>>>>>
>>>>>> On Mon, Aug 20, 2018 at 9:20 AM Lukasz Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> +1 on making the resources part of a proto. Based upon what Henning
>>>>>>> linked to, the provisioning API seems like an appropriate place to 
>>>>>>> provide
>>>>>>> this information.
>>>>>>>
>>>>>>> Thomas, I believe the environment proto is the best place to add
>>>>>>> information that a runner may want to know about upfront during pipeline
>>>>>>> pipeline creation. I wouldn't stick this into PipelineOptions for the 
>>>>>>> long
>>>>>>> term.
>>>>>>> If you have time to capture these thoughts and update the
>>>>>>> environment proto, I would suggest going down that path. Otherwise 
>>>>>>> anything
>>>>>>> short term like PipelineOptions will do.
>>>>>>>
>>>>>>> On Sun, Aug 19, 2018 at 5:41 PM Thomas Weise <[email protected]> wrote:
>>>>>>>
>>>>>>>> For SDKs where the upper limit is constant and known upfront, why
>>>>>>>> not communicate this along with the other harness resource info as 
>>>>>>>> part of
>>>>>>>> the job submission?
>>>>>>>>
>>>>>>>> Regarding use of GRPC headers: Why not make this explicit in the
>>>>>>>> proto instead?
>>>>>>>>
>>>>>>>> WRT runner dictating resource constraints: The runner actually may
>>>>>>>> also not have that information. It would need to be supplied as part 
>>>>>>>> of the
>>>>>>>> pipeline options? The cluster resource manager needs to allocate 
>>>>>>>> resources
>>>>>>>> for both, the runner and the SDK harness(es).
>>>>>>>>
>>>>>>>> Finally, what can be done to unblock the Flink runner / Python
>>>>>>>> until solution discussed here is in place? An extra runner option for 
>>>>>>>> SDK
>>>>>>>> singleton on/off?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Aug 18, 2018 at 1:34 AM Ankur Goenka <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Sounds good to me.
>>>>>>>>> GRPC Header of the control channel seems to be a good place to add
>>>>>>>>> upper bound information.
>>>>>>>>> Added jiras:
>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-5166
>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-5167
>>>>>>>>>
>>>>>>>>> On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Regarding resources: the runner can currently dictate the
>>>>>>>>>> mem/cpu/disk resources that the harness is allowed to use via the
>>>>>>>>>> provisioning api. The SDK harness need not -- and should not -- 
>>>>>>>>>> speculate
>>>>>>>>>> on what else might be running on the machine:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/beam/blob/0e14965707b5d48a3de7fa69f09d88ef0aa48c09/model/fn-execution/src/main/proto/beam_provision_api.proto#L69
>>>>>>>>>>
>>>>>>>>>> A realistic startup-time computation in the SDK harness would be
>>>>>>>>>> something simple like: max(1, min(cpu*100, mem_mb/10)) say, and use 
>>>>>>>>>> that at
>>>>>>>>>> most number of threads. Or just hardcode to 300. Or a user-provided 
>>>>>>>>>> value.
>>>>>>>>>> Whatever the value is the maximum number of bundles in flight 
>>>>>>>>>> allowed at
>>>>>>>>>> any given time and needs to be communicated to the runner via some 
>>>>>>>>>> message.
>>>>>>>>>> Anything beyond would be rejected (but this shouldn't happen, 
>>>>>>>>>> because the
>>>>>>>>>> runner should respect that number).
>>>>>>>>>>
>>>>>>>>>> A dynamic computation would use the same limits from the SDK, but
>>>>>>>>>> take into account its own resource usage (incl. the usage by running
>>>>>>>>>> bundles).
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 17, 2018 at 6:20 PM Ankur Goenka <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I am thinking upper bound to be more on the lines of
>>>>>>>>>>> theocratical upper limit or any other static high value beyond 
>>>>>>>>>>> which the
>>>>>>>>>>> SDK will reject bundle verbosely. The idea is that SDK will not keep
>>>>>>>>>>> bundles in queue while waiting on current bundles to finish. It 
>>>>>>>>>>> will simply
>>>>>>>>>>> reject any additional bundle.
>>>>>>>>>>> Beyond this I don't have a good answer to dynamic upper bound.
>>>>>>>>>>> As SDK does not have the complete picture of processes on the 
>>>>>>>>>>> machine with
>>>>>>>>>>> which it share resources, resources might not be a good proxy for 
>>>>>>>>>>> upper
>>>>>>>>>>> bound from the SDK point of view.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 17, 2018 at 6:01 PM Lukasz Cwik <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ankur, how would you expect an SDK to compute a realistic upper
>>>>>>>>>>>> bound (upfront or during pipeline computation)?
>>>>>>>>>>>>
>>>>>>>>>>>> First thought that came to my mind was that the SDK would
>>>>>>>>>>>> provide CPU/memory/... resourcing information and the runner 
>>>>>>>>>>>> making a
>>>>>>>>>>>> judgement call as to whether it should ask the SDK to do more work 
>>>>>>>>>>>> or less
>>>>>>>>>>>> but its not an explicit don't do more then X bundles in parallel.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Makes sense. Having exposed upper bound on concurrency with
>>>>>>>>>>>>> optimum concurrency can give a good balance. This is good 
>>>>>>>>>>>>> information to
>>>>>>>>>>>>> expose while keeping the requirements from the SDK simple. SDK 
>>>>>>>>>>>>> can publish
>>>>>>>>>>>>> 1 as the optimum concurrency and upper bound to keep things 
>>>>>>>>>>>>> simple.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Runner introspection of upper bound on concurrency is
>>>>>>>>>>>>> important for correctness while introspection of optimum 
>>>>>>>>>>>>> concurrency is
>>>>>>>>>>>>> important for efficiency. This separates efficiency and 
>>>>>>>>>>>>> correctness
>>>>>>>>>>>>> requirements.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I agree with Luke's observation, with the caveat that "infinite
>>>>>>>>>>>>>> amount of bundles in parallel" is limited by the available 
>>>>>>>>>>>>>> resources. For
>>>>>>>>>>>>>> example, the Go SDK harness will accept an arbitrary amount of 
>>>>>>>>>>>>>> parallel
>>>>>>>>>>>>>> work, but too much work will cause either excessive GC pressure 
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>> crippling slowness or an outright OOM. Unless it's always 1, a 
>>>>>>>>>>>>>> reasonable
>>>>>>>>>>>>>> upper bound will either have to be provided by the user or 
>>>>>>>>>>>>>> computed from
>>>>>>>>>>>>>> the mem/cpu resources given. Of course, as some bundles takes 
>>>>>>>>>>>>>> more
>>>>>>>>>>>>>> resources than others, so any static value will be an estimate 
>>>>>>>>>>>>>> or ignore
>>>>>>>>>>>>>> resource limits.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> That said, I do not like that an "efficiency" aspect becomes
>>>>>>>>>>>>>> a subtle requirement for correctness due to Flink internals. I 
>>>>>>>>>>>>>> fear that
>>>>>>>>>>>>>> road leads to trouble.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The later case of having a of supporting single bundle
>>>>>>>>>>>>>>> execution at a time on SDK and runner not using this flag is 
>>>>>>>>>>>>>>> exactly the
>>>>>>>>>>>>>>> reason we got into the Dead Lock here.
>>>>>>>>>>>>>>> I agree with exposing SDK optimum concurrency level ( 1 in
>>>>>>>>>>>>>>> later case ) and let runner decide to use it or not. But at the 
>>>>>>>>>>>>>>> same time
>>>>>>>>>>>>>>> expect SDK to handle infinite amount of bundles even if its not 
>>>>>>>>>>>>>>> efficient.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Ankur
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I believe in practice SDK harnesses will fall into one of
>>>>>>>>>>>>>>>> two capabilities, can process effectively an infinite amount 
>>>>>>>>>>>>>>>> of bundles in
>>>>>>>>>>>>>>>> parallel or can only process a single bundle at a time.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I believe it is more difficult for a runner to handle the
>>>>>>>>>>>>>>>> latter case well and to perform all the environment management 
>>>>>>>>>>>>>>>> that would
>>>>>>>>>>>>>>>> make that efficient. It may be inefficient for an SDK but I do 
>>>>>>>>>>>>>>>> believe it
>>>>>>>>>>>>>>>> should be able to say that I'm not great at anything more then 
>>>>>>>>>>>>>>>> a single
>>>>>>>>>>>>>>>> bundle at a time but utilizing this information by a runner 
>>>>>>>>>>>>>>>> should be
>>>>>>>>>>>>>>>> optional.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> To recap the discussion it seems that we have come-up with
>>>>>>>>>>>>>>>>> following point.
>>>>>>>>>>>>>>>>> SDKHarness Management and initialization.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>    1. Runner completely own the work assignment to
>>>>>>>>>>>>>>>>>    SDKHarness.
>>>>>>>>>>>>>>>>>    2. Runner should know the capabilities and capacity of
>>>>>>>>>>>>>>>>>    SDKHarness and should assign work accordingly.
>>>>>>>>>>>>>>>>>    3. Spinning up of SDKHarness is runner's
>>>>>>>>>>>>>>>>>    responsibility and it can be done statically (a fixed pre 
>>>>>>>>>>>>>>>>> configured number
>>>>>>>>>>>>>>>>>    of SDKHarness) or dynamically or based on certain other 
>>>>>>>>>>>>>>>>> configuration/logic
>>>>>>>>>>>>>>>>>    which runner choose.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> SDKHarness Expectation. This is more in question and we
>>>>>>>>>>>>>>>>> should outline the responsibility of SDKHarness.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>    1. SDKHarness should publish how many concurrent tasks
>>>>>>>>>>>>>>>>>    it can execute.
>>>>>>>>>>>>>>>>>    2. SDKHarness should start executing all the tasks
>>>>>>>>>>>>>>>>>    items assigned in parallel in a timely manner or fail task.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Also to add to simplification side. I think for better
>>>>>>>>>>>>>>>>> adoption, we should have simple SDKHarness as well as simple 
>>>>>>>>>>>>>>>>> Runner
>>>>>>>>>>>>>>>>> integration to encourage integration with more runner. Also 
>>>>>>>>>>>>>>>>> many runners
>>>>>>>>>>>>>>>>> might not expose some of the internal scheduling 
>>>>>>>>>>>>>>>>> characteristics so we
>>>>>>>>>>>>>>>>> should not expect scheduling characteristics for runner 
>>>>>>>>>>>>>>>>> integration.
>>>>>>>>>>>>>>>>> Moreover scheduling characteristics can change based on 
>>>>>>>>>>>>>>>>> pipeline type,
>>>>>>>>>>>>>>>>> infrastructure, available resource etc. So I am a bit 
>>>>>>>>>>>>>>>>> hesitant to add
>>>>>>>>>>>>>>>>> runner scheduling specifics for runner integration.
>>>>>>>>>>>>>>>>> A good balance between SDKHarness complexity and Runner
>>>>>>>>>>>>>>>>> integration can be helpful in easier adoption.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Ankur
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Finding a good balance is indeed the art of portability,
>>>>>>>>>>>>>>>>>> because the range of capability (and assumptions) on both 
>>>>>>>>>>>>>>>>>> sides is wide.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> It was originally the idea to allow the SDK harness to be
>>>>>>>>>>>>>>>>>> an extremely simple bundle executer (specifically, 
>>>>>>>>>>>>>>>>>> single-threaded
>>>>>>>>>>>>>>>>>> execution one instruction at a time) however inefficient -- 
>>>>>>>>>>>>>>>>>> a more
>>>>>>>>>>>>>>>>>> sophisticated SDK harness would support more features and be 
>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>> efficient. For the issue described here, it seems 
>>>>>>>>>>>>>>>>>> problematic to me to send
>>>>>>>>>>>>>>>>>> non-executable bundles to the SDK harness under the 
>>>>>>>>>>>>>>>>>> expectation that the
>>>>>>>>>>>>>>>>>> SDK harness will concurrently work its way deeply enough 
>>>>>>>>>>>>>>>>>> down the
>>>>>>>>>>>>>>>>>> instruction queue to unblock itself. That would be an 
>>>>>>>>>>>>>>>>>> extremely subtle
>>>>>>>>>>>>>>>>>> requirement for SDK authors and one practical question
>>>>>>>>>>>>>>>>>> becomes: what should an SDK do with a bundle instruction 
>>>>>>>>>>>>>>>>>> that it doesn't
>>>>>>>>>>>>>>>>>> have capacity to execute? If a runner needs to make such
>>>>>>>>>>>>>>>>>> assumptions, I think that information should probably rather 
>>>>>>>>>>>>>>>>>> be explicit
>>>>>>>>>>>>>>>>>> along the lines of proposal 1 -- i.e., some kind of 
>>>>>>>>>>>>>>>>>> negotiation between
>>>>>>>>>>>>>>>>>> resources allotted to the SDK harness (a preliminary variant 
>>>>>>>>>>>>>>>>>> are in the
>>>>>>>>>>>>>>>>>> provisioning api) and what the SDK harness in return can do 
>>>>>>>>>>>>>>>>>> (and a valid
>>>>>>>>>>>>>>>>>> answer might be: 1 bundle at a time irrespectively of 
>>>>>>>>>>>>>>>>>> resources given) or a
>>>>>>>>>>>>>>>>>> per-bundle special "overloaded" error response. For other 
>>>>>>>>>>>>>>>>>> aspects, such as
>>>>>>>>>>>>>>>>>> side input readiness, the runner handles that complexity and 
>>>>>>>>>>>>>>>>>> the overall
>>>>>>>>>>>>>>>>>> bias has generally been to move complexity to the runner 
>>>>>>>>>>>>>>>>>> side.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The SDK harness and initialization overhead is entirely
>>>>>>>>>>>>>>>>>> SDK, job type and even pipeline specific. A docker container 
>>>>>>>>>>>>>>>>>> is also just a
>>>>>>>>>>>>>>>>>> process, btw, and doesn't inherently carry much overhead. 
>>>>>>>>>>>>>>>>>> That said, on a
>>>>>>>>>>>>>>>>>> single host, a static docker configuration is generally a 
>>>>>>>>>>>>>>>>>> lot simpler to
>>>>>>>>>>>>>>>>>> work with.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Henning
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> It is good to see this discussed!
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I think there needs to be a good balance between the SDK
>>>>>>>>>>>>>>>>>>> harness capabilities/complexity and responsibilities. 
>>>>>>>>>>>>>>>>>>> Additionally the user
>>>>>>>>>>>>>>>>>>> will need to be able to adjust the runner behavior, since 
>>>>>>>>>>>>>>>>>>> the type of
>>>>>>>>>>>>>>>>>>> workload executed in the harness also is a factor. 
>>>>>>>>>>>>>>>>>>> Elsewhere we already
>>>>>>>>>>>>>>>>>>> discussed that the current assumption of a single SDK 
>>>>>>>>>>>>>>>>>>> harness instance per
>>>>>>>>>>>>>>>>>>> Flink task manager brings problems with it and that there 
>>>>>>>>>>>>>>>>>>> needs to be more
>>>>>>>>>>>>>>>>>>> than one way how the runner can spin up SDK harnesses.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> There was the concern that instantiation if multiple SDK
>>>>>>>>>>>>>>>>>>> harnesses per TM host is expensive (resource usage, 
>>>>>>>>>>>>>>>>>>> initialization time
>>>>>>>>>>>>>>>>>>> etc.). That may hold true for a specific scenario, such as 
>>>>>>>>>>>>>>>>>>> batch workloads
>>>>>>>>>>>>>>>>>>> and the use of Docker containers. But it may look totally 
>>>>>>>>>>>>>>>>>>> different for a
>>>>>>>>>>>>>>>>>>> streaming topology or when SDK harness is just a process on 
>>>>>>>>>>>>>>>>>>> the same host.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> SDK harnesses were always responsible for executing all
>>>>>>>>>>>>>>>>>>>> work given to it concurrently. Runners have been 
>>>>>>>>>>>>>>>>>>>> responsible for choosing
>>>>>>>>>>>>>>>>>>>> how much work to give to the SDK harness in such a way 
>>>>>>>>>>>>>>>>>>>> that best utilizes
>>>>>>>>>>>>>>>>>>>> the SDK harness.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I understand that multithreading in python is
>>>>>>>>>>>>>>>>>>>> inefficient due to the global interpreter lock, it would 
>>>>>>>>>>>>>>>>>>>> be upto the runner
>>>>>>>>>>>>>>>>>>>> in this case to make sure that the amount of work it gives 
>>>>>>>>>>>>>>>>>>>> to each SDK
>>>>>>>>>>>>>>>>>>>> harness best utilizes it while spinning up an appropriate 
>>>>>>>>>>>>>>>>>>>> number of SDK
>>>>>>>>>>>>>>>>>>>> harnesses.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Ankur,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks for looking into this problem. The cause seems
>>>>>>>>>>>>>>>>>>>>> to be Flink's
>>>>>>>>>>>>>>>>>>>>> pipelined execution mode. It runs multiple tasks in
>>>>>>>>>>>>>>>>>>>>> one task slot and
>>>>>>>>>>>>>>>>>>>>> produces a deadlock when the pipelined operators
>>>>>>>>>>>>>>>>>>>>> schedule the SDK
>>>>>>>>>>>>>>>>>>>>> harness DoFns in non-topological order.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The problem would be resolved if we scheduled the
>>>>>>>>>>>>>>>>>>>>> tasks in topological
>>>>>>>>>>>>>>>>>>>>> order. Doing that is not easy because they run in
>>>>>>>>>>>>>>>>>>>>> separate Flink
>>>>>>>>>>>>>>>>>>>>> operators and the SDK Harness would have to have
>>>>>>>>>>>>>>>>>>>>> insight into the
>>>>>>>>>>>>>>>>>>>>> execution graph (which is not desirable).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The easiest method, which you proposed in 1) is to
>>>>>>>>>>>>>>>>>>>>> ensure that the
>>>>>>>>>>>>>>>>>>>>> number of threads in the SDK harness matches the
>>>>>>>>>>>>>>>>>>>>> number of
>>>>>>>>>>>>>>>>>>>>> ExecutableStage DoFn operators.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The approach in 2) is what Flink does as well. It
>>>>>>>>>>>>>>>>>>>>> glues together
>>>>>>>>>>>>>>>>>>>>> horizontal parts of the execution graph, also in
>>>>>>>>>>>>>>>>>>>>> multiple threads. So I
>>>>>>>>>>>>>>>>>>>>> agree with your proposed solution.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>> Max
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote:
>>>>>>>>>>>>>>>>>>>>> > Hi,
>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>> > tl;dr Dead Lock in task execution caused by limited
>>>>>>>>>>>>>>>>>>>>> task parallelism on
>>>>>>>>>>>>>>>>>>>>> > SDKHarness.
>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>> > *Setup:*
>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>> >   * Job type: /*Beam Portable Python Batch*/ Job on
>>>>>>>>>>>>>>>>>>>>> Flink standalone
>>>>>>>>>>>>>>>>>>>>> >     cluster.
>>>>>>>>>>>>>>>>>>>>> >   * Only a single job is scheduled on the cluster.
>>>>>>>>>>>>>>>>>>>>> >   * Everything is running on a single machine with
>>>>>>>>>>>>>>>>>>>>> single Flink task
>>>>>>>>>>>>>>>>>>>>> >     manager.
>>>>>>>>>>>>>>>>>>>>> >   * Flink Task Manager Slots is 1.
>>>>>>>>>>>>>>>>>>>>> >   * Flink Parallelism is 1.
>>>>>>>>>>>>>>>>>>>>> >   * Python SDKHarness has 1 thread.
>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>> > *Example pipeline:*
>>>>>>>>>>>>>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink
>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>> > *Issue:*
>>>>>>>>>>>>>>>>>>>>> > With multi stage job, Flink schedule different
>>>>>>>>>>>>>>>>>>>>> dependent sub tasks
>>>>>>>>>>>>>>>>>>>>> > concurrently on Flink worker as long as it can get
>>>>>>>>>>>>>>>>>>>>> slots. Each map tasks
>>>>>>>>>>>>>>>>>>>>> > are then executed on SDKHarness.
>>>>>>>>>>>>>>>>>>>>> > Its possible that MapB gets to SDKHarness before
>>>>>>>>>>>>>>>>>>>>> MapA and hence gets
>>>>>>>>>>>>>>>>>>>>> > into the execution queue before MapA. Because we
>>>>>>>>>>>>>>>>>>>>> only have 1 execution
>>>>>>>>>>>>>>>>>>>>> > thread on SDKHarness, MapA will never get a chance
>>>>>>>>>>>>>>>>>>>>> to execute as MapB
>>>>>>>>>>>>>>>>>>>>> > will never release the execution thread. MapB will
>>>>>>>>>>>>>>>>>>>>> wait for input from
>>>>>>>>>>>>>>>>>>>>> > MapA. This gets us to a dead lock in a simple
>>>>>>>>>>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>> > *Mitigation:*
>>>>>>>>>>>>>>>>>>>>> > Set worker_count in pipeline options more than the
>>>>>>>>>>>>>>>>>>>>> expected sub tasks
>>>>>>>>>>>>>>>>>>>>> > in pipeline.
>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>> > *Proposal:*
>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>> >  1. We can get the maximum concurrency from the
>>>>>>>>>>>>>>>>>>>>> runner and make sure
>>>>>>>>>>>>>>>>>>>>> >     that we have more threads than max concurrency.
>>>>>>>>>>>>>>>>>>>>> This approach
>>>>>>>>>>>>>>>>>>>>> >     assumes that Beam has insight into runner
>>>>>>>>>>>>>>>>>>>>> execution plan and can
>>>>>>>>>>>>>>>>>>>>> >     make decision based on it.
>>>>>>>>>>>>>>>>>>>>> >  2. We dynamically create thread and cache them with
>>>>>>>>>>>>>>>>>>>>> a high upper bound
>>>>>>>>>>>>>>>>>>>>> >     in SDKHarness. We can warn if we are hitting the
>>>>>>>>>>>>>>>>>>>>> upper bound of
>>>>>>>>>>>>>>>>>>>>> >     threads. This approach assumes that runner does
>>>>>>>>>>>>>>>>>>>>> a good job of
>>>>>>>>>>>>>>>>>>>>> >     scheduling and will distribute tasks more or
>>>>>>>>>>>>>>>>>>>>> less evenly.
>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>> > We expect good scheduling from runners so I prefer
>>>>>>>>>>>>>>>>>>>>> approach 2. It is
>>>>>>>>>>>>>>>>>>>>> > simpler to implement and the implementation is not
>>>>>>>>>>>>>>>>>>>>> runner specific. This
>>>>>>>>>>>>>>>>>>>>> > approach better utilize resource as it creates only
>>>>>>>>>>>>>>>>>>>>> as many threads as
>>>>>>>>>>>>>>>>>>>>> > needed instead of the peak thread requirement.
>>>>>>>>>>>>>>>>>>>>> > And last but not the least, it gives runner control
>>>>>>>>>>>>>>>>>>>>> over managing truly
>>>>>>>>>>>>>>>>>>>>> > active tasks.
>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>> > Please let me know if I am missing something and
>>>>>>>>>>>>>>>>>>>>> your thoughts on the
>>>>>>>>>>>>>>>>>>>>> > approach.
>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>> > Thanks,
>>>>>>>>>>>>>>>>>>>>> > Ankur
>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>

Reply via email to