For SDKs where the upper limit is constant and known upfront, why not
communicate this along with the other harness resource info as part of the
job submission?

Regarding use of GRPC headers: Why not make this explicit in the proto
instead?

WRT runner dictating resource constraints: The runner actually may also not
have that information. It would need to be supplied as part of the pipeline
options? The cluster resource manager needs to allocate resources for both,
the runner and the SDK harness(es).

Finally, what can be done to unblock the Flink runner / Python until
solution discussed here is in place? An extra runner option for SDK
singleton on/off?


On Sat, Aug 18, 2018 at 1:34 AM Ankur Goenka <goe...@google.com> wrote:

> Sounds good to me.
> GRPC Header of the control channel seems to be a good place to add upper
> bound information.
> Added jiras:
> https://issues.apache.org/jira/browse/BEAM-5166
> https://issues.apache.org/jira/browse/BEAM-5167
>
> On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde <hero...@google.com> wrote:
>
>> Regarding resources: the runner can currently dictate the mem/cpu/disk
>> resources that the harness is allowed to use via the provisioning api. The
>> SDK harness need not -- and should not -- speculate on what else might be
>> running on the machine:
>>
>>
>> https://github.com/apache/beam/blob/0e14965707b5d48a3de7fa69f09d88ef0aa48c09/model/fn-execution/src/main/proto/beam_provision_api.proto#L69
>>
>> A realistic startup-time computation in the SDK harness would be
>> something simple like: max(1, min(cpu*100, mem_mb/10)) say, and use that at
>> most number of threads. Or just hardcode to 300. Or a user-provided value.
>> Whatever the value is the maximum number of bundles in flight allowed at
>> any given time and needs to be communicated to the runner via some message.
>> Anything beyond would be rejected (but this shouldn't happen, because the
>> runner should respect that number).
>>
>> A dynamic computation would use the same limits from the SDK, but take
>> into account its own resource usage (incl. the usage by running bundles).
>>
>> On Fri, Aug 17, 2018 at 6:20 PM Ankur Goenka <goe...@google.com> wrote:
>>
>>> I am thinking upper bound to be more on the lines of theocratical upper
>>> limit or any other static high value beyond which the SDK will reject
>>> bundle verbosely. The idea is that SDK will not keep bundles in queue while
>>> waiting on current bundles to finish. It will simply reject any additional
>>> bundle.
>>> Beyond this I don't have a good answer to dynamic upper bound. As SDK
>>> does not have the complete picture of processes on the machine with which
>>> it share resources, resources might not be a good proxy for upper bound
>>> from the SDK point of view.
>>>
>>> On Fri, Aug 17, 2018 at 6:01 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Ankur, how would you expect an SDK to compute a realistic upper bound
>>>> (upfront or during pipeline computation)?
>>>>
>>>> First thought that came to my mind was that the SDK would provide
>>>> CPU/memory/... resourcing information and the runner making a judgement
>>>> call as to whether it should ask the SDK to do more work or less but its
>>>> not an explicit don't do more then X bundles in parallel.
>>>>
>>>> On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <goe...@google.com> wrote:
>>>>
>>>>> Makes sense. Having exposed upper bound on concurrency with optimum
>>>>> concurrency can give a good balance. This is good information to expose
>>>>> while keeping the requirements from the SDK simple. SDK can publish 1 as
>>>>> the optimum concurrency and upper bound to keep things simple.
>>>>>
>>>>> Runner introspection of upper bound on concurrency is important for
>>>>> correctness while introspection of optimum concurrency is important for
>>>>> efficiency. This separates efficiency and correctness requirements.
>>>>>
>>>>> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde <hero...@google.com>
>>>>> wrote:
>>>>>
>>>>>> I agree with Luke's observation, with the caveat that "infinite
>>>>>> amount of bundles in parallel" is limited by the available resources. For
>>>>>> example, the Go SDK harness will accept an arbitrary amount of parallel
>>>>>> work, but too much work will cause either excessive GC pressure with
>>>>>> crippling slowness or an outright OOM. Unless it's always 1, a reasonable
>>>>>> upper bound will either have to be provided by the user or computed from
>>>>>> the mem/cpu resources given. Of course, as some bundles takes more
>>>>>> resources than others, so any static value will be an estimate or ignore
>>>>>> resource limits.
>>>>>>
>>>>>> That said, I do not like that an "efficiency" aspect becomes a subtle
>>>>>> requirement for correctness due to Flink internals. I fear that road 
>>>>>> leads
>>>>>> to trouble.
>>>>>>
>>>>>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka <goe...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> The later case of having a of supporting single bundle execution at
>>>>>>> a time on SDK and runner not using this flag is exactly the reason we 
>>>>>>> got
>>>>>>> into the Dead Lock here.
>>>>>>> I agree with exposing SDK optimum concurrency level ( 1 in later
>>>>>>> case ) and let runner decide to use it or not. But at the same time 
>>>>>>> expect
>>>>>>> SDK to handle infinite amount of bundles even if its not efficient.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Ankur
>>>>>>>
>>>>>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I believe in practice SDK harnesses will fall into one of two
>>>>>>>> capabilities, can process effectively an infinite amount of bundles in
>>>>>>>> parallel or can only process a single bundle at a time.
>>>>>>>>
>>>>>>>> I believe it is more difficult for a runner to handle the latter
>>>>>>>> case well and to perform all the environment management that would make
>>>>>>>> that efficient. It may be inefficient for an SDK but I do believe it 
>>>>>>>> should
>>>>>>>> be able to say that I'm not great at anything more then a single 
>>>>>>>> bundle at
>>>>>>>> a time but utilizing this information by a runner should be optional.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <goe...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> To recap the discussion it seems that we have come-up with
>>>>>>>>> following point.
>>>>>>>>> SDKHarness Management and initialization.
>>>>>>>>>
>>>>>>>>>    1. Runner completely own the work assignment to SDKHarness.
>>>>>>>>>    2. Runner should know the capabilities and capacity of
>>>>>>>>>    SDKHarness and should assign work accordingly.
>>>>>>>>>    3. Spinning up of SDKHarness is runner's responsibility and it
>>>>>>>>>    can be done statically (a fixed pre configured number of 
>>>>>>>>> SDKHarness) or
>>>>>>>>>    dynamically or based on certain other configuration/logic which 
>>>>>>>>> runner
>>>>>>>>>    choose.
>>>>>>>>>
>>>>>>>>> SDKHarness Expectation. This is more in question and we should
>>>>>>>>> outline the responsibility of SDKHarness.
>>>>>>>>>
>>>>>>>>>    1. SDKHarness should publish how many concurrent tasks it can
>>>>>>>>>    execute.
>>>>>>>>>    2. SDKHarness should start executing all the tasks items
>>>>>>>>>    assigned in parallel in a timely manner or fail task.
>>>>>>>>>
>>>>>>>>> Also to add to simplification side. I think for better adoption,
>>>>>>>>> we should have simple SDKHarness as well as simple Runner integration 
>>>>>>>>> to
>>>>>>>>> encourage integration with more runner. Also many runners might not 
>>>>>>>>> expose
>>>>>>>>> some of the internal scheduling characteristics so we should not 
>>>>>>>>> expect
>>>>>>>>> scheduling characteristics for runner integration. Moreover scheduling
>>>>>>>>> characteristics can change based on pipeline type, infrastructure,
>>>>>>>>> available resource etc. So I am a bit hesitant to add runner 
>>>>>>>>> scheduling
>>>>>>>>> specifics for runner integration.
>>>>>>>>> A good balance between SDKHarness complexity and Runner
>>>>>>>>> integration can be helpful in easier adoption.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Ankur
>>>>>>>>>
>>>>>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde <hero...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Finding a good balance is indeed the art of portability, because
>>>>>>>>>> the range of capability (and assumptions) on both sides is wide.
>>>>>>>>>>
>>>>>>>>>> It was originally the idea to allow the SDK harness to be an
>>>>>>>>>> extremely simple bundle executer (specifically, single-threaded 
>>>>>>>>>> execution
>>>>>>>>>> one instruction at a time) however inefficient -- a more 
>>>>>>>>>> sophisticated SDK
>>>>>>>>>> harness would support more features and be more efficient. For the 
>>>>>>>>>> issue
>>>>>>>>>> described here, it seems problematic to me to send non-executable 
>>>>>>>>>> bundles
>>>>>>>>>> to the SDK harness under the expectation that the SDK harness will
>>>>>>>>>> concurrently work its way deeply enough down the instruction queue to
>>>>>>>>>> unblock itself. That would be an extremely subtle requirement for SDK
>>>>>>>>>> authors and one practical question becomes: what should an SDK
>>>>>>>>>> do with a bundle instruction that it doesn't have capacity to 
>>>>>>>>>> execute? If
>>>>>>>>>> a runner needs to make such assumptions, I think that information 
>>>>>>>>>> should
>>>>>>>>>> probably rather be explicit along the lines of proposal 1 -- i.e., 
>>>>>>>>>> some
>>>>>>>>>> kind of negotiation between resources allotted to the SDK harness (a
>>>>>>>>>> preliminary variant are in the provisioning api) and what the SDK 
>>>>>>>>>> harness
>>>>>>>>>> in return can do (and a valid answer might be: 1 bundle at a time
>>>>>>>>>> irrespectively of resources given) or a per-bundle special 
>>>>>>>>>> "overloaded"
>>>>>>>>>> error response. For other aspects, such as side input readiness, the 
>>>>>>>>>> runner
>>>>>>>>>> handles that complexity and the overall bias has generally been to 
>>>>>>>>>> move
>>>>>>>>>> complexity to the runner side.
>>>>>>>>>>
>>>>>>>>>> The SDK harness and initialization overhead is entirely SDK, job
>>>>>>>>>> type and even pipeline specific. A docker container is also just a 
>>>>>>>>>> process,
>>>>>>>>>> btw, and doesn't inherently carry much overhead. That said, on a 
>>>>>>>>>> single
>>>>>>>>>> host, a static docker configuration is generally a lot simpler to 
>>>>>>>>>> work with.
>>>>>>>>>>
>>>>>>>>>> Henning
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <t...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> It is good to see this discussed!
>>>>>>>>>>>
>>>>>>>>>>> I think there needs to be a good balance between the SDK harness
>>>>>>>>>>> capabilities/complexity and responsibilities. Additionally the user 
>>>>>>>>>>> will
>>>>>>>>>>> need to be able to adjust the runner behavior, since the type of 
>>>>>>>>>>> workload
>>>>>>>>>>> executed in the harness also is a factor. Elsewhere we already 
>>>>>>>>>>> discussed
>>>>>>>>>>> that the current assumption of a single SDK harness instance per 
>>>>>>>>>>> Flink task
>>>>>>>>>>> manager brings problems with it and that there needs to be more 
>>>>>>>>>>> than one
>>>>>>>>>>> way how the runner can spin up SDK harnesses.
>>>>>>>>>>>
>>>>>>>>>>> There was the concern that instantiation if multiple SDK
>>>>>>>>>>> harnesses per TM host is expensive (resource usage, initialization 
>>>>>>>>>>> time
>>>>>>>>>>> etc.). That may hold true for a specific scenario, such as batch 
>>>>>>>>>>> workloads
>>>>>>>>>>> and the use of Docker containers. But it may look totally different 
>>>>>>>>>>> for a
>>>>>>>>>>> streaming topology or when SDK harness is just a process on the 
>>>>>>>>>>> same host.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Thomas
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <lc...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> SDK harnesses were always responsible for executing all work
>>>>>>>>>>>> given to it concurrently. Runners have been responsible for 
>>>>>>>>>>>> choosing how
>>>>>>>>>>>> much work to give to the SDK harness in such a way that best 
>>>>>>>>>>>> utilizes the
>>>>>>>>>>>> SDK harness.
>>>>>>>>>>>>
>>>>>>>>>>>> I understand that multithreading in python is inefficient due
>>>>>>>>>>>> to the global interpreter lock, it would be upto the runner in 
>>>>>>>>>>>> this case to
>>>>>>>>>>>> make sure that the amount of work it gives to each SDK harness best
>>>>>>>>>>>> utilizes it while spinning up an appropriate number of SDK 
>>>>>>>>>>>> harnesses.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels <
>>>>>>>>>>>> m...@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Ankur,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for looking into this problem. The cause seems to be
>>>>>>>>>>>>> Flink's
>>>>>>>>>>>>> pipelined execution mode. It runs multiple tasks in one task
>>>>>>>>>>>>> slot and
>>>>>>>>>>>>> produces a deadlock when the pipelined operators schedule the
>>>>>>>>>>>>> SDK
>>>>>>>>>>>>> harness DoFns in non-topological order.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The problem would be resolved if we scheduled the tasks in
>>>>>>>>>>>>> topological
>>>>>>>>>>>>> order. Doing that is not easy because they run in separate
>>>>>>>>>>>>> Flink
>>>>>>>>>>>>> operators and the SDK Harness would have to have insight into
>>>>>>>>>>>>> the
>>>>>>>>>>>>> execution graph (which is not desirable).
>>>>>>>>>>>>>
>>>>>>>>>>>>> The easiest method, which you proposed in 1) is to ensure that
>>>>>>>>>>>>> the
>>>>>>>>>>>>> number of threads in the SDK harness matches the number of
>>>>>>>>>>>>> ExecutableStage DoFn operators.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The approach in 2) is what Flink does as well. It glues
>>>>>>>>>>>>> together
>>>>>>>>>>>>> horizontal parts of the execution graph, also in multiple
>>>>>>>>>>>>> threads. So I
>>>>>>>>>>>>> agree with your proposed solution.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Max
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote:
>>>>>>>>>>>>> > Hi,
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > tl;dr Dead Lock in task execution caused by limited task
>>>>>>>>>>>>> parallelism on
>>>>>>>>>>>>> > SDKHarness.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > *Setup:*
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >   * Job type: /*Beam Portable Python Batch*/ Job on Flink
>>>>>>>>>>>>> standalone
>>>>>>>>>>>>> >     cluster.
>>>>>>>>>>>>> >   * Only a single job is scheduled on the cluster.
>>>>>>>>>>>>> >   * Everything is running on a single machine with single
>>>>>>>>>>>>> Flink task
>>>>>>>>>>>>> >     manager.
>>>>>>>>>>>>> >   * Flink Task Manager Slots is 1.
>>>>>>>>>>>>> >   * Flink Parallelism is 1.
>>>>>>>>>>>>> >   * Python SDKHarness has 1 thread.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > *Example pipeline:*
>>>>>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > *Issue:*
>>>>>>>>>>>>> > With multi stage job, Flink schedule different dependent sub
>>>>>>>>>>>>> tasks
>>>>>>>>>>>>> > concurrently on Flink worker as long as it can get slots.
>>>>>>>>>>>>> Each map tasks
>>>>>>>>>>>>> > are then executed on SDKHarness.
>>>>>>>>>>>>> > Its possible that MapB gets to SDKHarness before MapA and
>>>>>>>>>>>>> hence gets
>>>>>>>>>>>>> > into the execution queue before MapA. Because we only have 1
>>>>>>>>>>>>> execution
>>>>>>>>>>>>> > thread on SDKHarness, MapA will never get a chance to
>>>>>>>>>>>>> execute as MapB
>>>>>>>>>>>>> > will never release the execution thread. MapB will wait for
>>>>>>>>>>>>> input from
>>>>>>>>>>>>> > MapA. This gets us to a dead lock in a simple pipeline.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > *Mitigation:*
>>>>>>>>>>>>> > Set worker_count in pipeline options more than the expected
>>>>>>>>>>>>> sub tasks
>>>>>>>>>>>>> > in pipeline.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > *Proposal:*
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >  1. We can get the maximum concurrency from the runner and
>>>>>>>>>>>>> make sure
>>>>>>>>>>>>> >     that we have more threads than max concurrency. This
>>>>>>>>>>>>> approach
>>>>>>>>>>>>> >     assumes that Beam has insight into runner execution plan
>>>>>>>>>>>>> and can
>>>>>>>>>>>>> >     make decision based on it.
>>>>>>>>>>>>> >  2. We dynamically create thread and cache them with a high
>>>>>>>>>>>>> upper bound
>>>>>>>>>>>>> >     in SDKHarness. We can warn if we are hitting the upper
>>>>>>>>>>>>> bound of
>>>>>>>>>>>>> >     threads. This approach assumes that runner does a good
>>>>>>>>>>>>> job of
>>>>>>>>>>>>> >     scheduling and will distribute tasks more or less evenly.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > We expect good scheduling from runners so I prefer approach
>>>>>>>>>>>>> 2. It is
>>>>>>>>>>>>> > simpler to implement and the implementation is not runner
>>>>>>>>>>>>> specific. This
>>>>>>>>>>>>> > approach better utilize resource as it creates only as many
>>>>>>>>>>>>> threads as
>>>>>>>>>>>>> > needed instead of the peak thread requirement.
>>>>>>>>>>>>> > And last but not the least, it gives runner control over
>>>>>>>>>>>>> managing truly
>>>>>>>>>>>>> > active tasks.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Please let me know if I am missing something and your
>>>>>>>>>>>>> thoughts on the
>>>>>>>>>>>>> > approach.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Thanks,
>>>>>>>>>>>>> > Ankur
>>>>>>>>>>>>> >
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to