Ankur, how would you expect an SDK to compute a realistic upper bound
(upfront or during pipeline computation)?

First thought that came to my mind was that the SDK would provide
CPU/memory/... resourcing information and the runner making a judgement
call as to whether it should ask the SDK to do more work or less but its
not an explicit don't do more then X bundles in parallel.

On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <goe...@google.com> wrote:

> Makes sense. Having exposed upper bound on concurrency with optimum
> concurrency can give a good balance. This is good information to expose
> while keeping the requirements from the SDK simple. SDK can publish 1 as
> the optimum concurrency and upper bound to keep things simple.
>
> Runner introspection of upper bound on concurrency is important for
> correctness while introspection of optimum concurrency is important for
> efficiency. This separates efficiency and correctness requirements.
>
> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde <hero...@google.com> wrote:
>
>> I agree with Luke's observation, with the caveat that "infinite amount
>> of bundles in parallel" is limited by the available resources. For example,
>> the Go SDK harness will accept an arbitrary amount of parallel work, but
>> too much work will cause either excessive GC pressure with crippling
>> slowness or an outright OOM. Unless it's always 1, a reasonable upper bound
>> will either have to be provided by the user or computed from the mem/cpu
>> resources given. Of course, as some bundles takes more resources than
>> others, so any static value will be an estimate or ignore resource limits.
>>
>> That said, I do not like that an "efficiency" aspect becomes a subtle
>> requirement for correctness due to Flink internals. I fear that road leads
>> to trouble.
>>
>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka <goe...@google.com> wrote:
>>
>>> The later case of having a of supporting single bundle execution at a
>>> time on SDK and runner not using this flag is exactly the reason we got
>>> into the Dead Lock here.
>>> I agree with exposing SDK optimum concurrency level ( 1 in later case )
>>> and let runner decide to use it or not. But at the same time expect SDK to
>>> handle infinite amount of bundles even if its not efficient.
>>>
>>> Thanks,
>>> Ankur
>>>
>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> I believe in practice SDK harnesses will fall into one of two
>>>> capabilities, can process effectively an infinite amount of bundles in
>>>> parallel or can only process a single bundle at a time.
>>>>
>>>> I believe it is more difficult for a runner to handle the latter case
>>>> well and to perform all the environment management that would make that
>>>> efficient. It may be inefficient for an SDK but I do believe it should be
>>>> able to say that I'm not great at anything more then a single bundle at a
>>>> time but utilizing this information by a runner should be optional.
>>>>
>>>>
>>>>
>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <goe...@google.com> wrote:
>>>>
>>>>> To recap the discussion it seems that we have come-up with following
>>>>> point.
>>>>> SDKHarness Management and initialization.
>>>>>
>>>>>    1. Runner completely own the work assignment to SDKHarness.
>>>>>    2. Runner should know the capabilities and capacity of SDKHarness
>>>>>    and should assign work accordingly.
>>>>>    3. Spinning up of SDKHarness is runner's responsibility and it can
>>>>>    be done statically (a fixed pre configured number of SDKHarness) or
>>>>>    dynamically or based on certain other configuration/logic which runner
>>>>>    choose.
>>>>>
>>>>> SDKHarness Expectation. This is more in question and we should outline
>>>>> the responsibility of SDKHarness.
>>>>>
>>>>>    1. SDKHarness should publish how many concurrent tasks it can
>>>>>    execute.
>>>>>    2. SDKHarness should start executing all the tasks items assigned
>>>>>    in parallel in a timely manner or fail task.
>>>>>
>>>>> Also to add to simplification side. I think for better adoption, we
>>>>> should have simple SDKHarness as well as simple Runner integration to
>>>>> encourage integration with more runner. Also many runners might not expose
>>>>> some of the internal scheduling characteristics so we should not expect
>>>>> scheduling characteristics for runner integration. Moreover scheduling
>>>>> characteristics can change based on pipeline type, infrastructure,
>>>>> available resource etc. So I am a bit hesitant to add runner scheduling
>>>>> specifics for runner integration.
>>>>> A good balance between SDKHarness complexity and Runner integration
>>>>> can be helpful in easier adoption.
>>>>>
>>>>> Thanks,
>>>>> Ankur
>>>>>
>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde <hero...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Finding a good balance is indeed the art of portability, because the
>>>>>> range of capability (and assumptions) on both sides is wide.
>>>>>>
>>>>>> It was originally the idea to allow the SDK harness to be an
>>>>>> extremely simple bundle executer (specifically, single-threaded execution
>>>>>> one instruction at a time) however inefficient -- a more sophisticated 
>>>>>> SDK
>>>>>> harness would support more features and be more efficient. For the issue
>>>>>> described here, it seems problematic to me to send non-executable bundles
>>>>>> to the SDK harness under the expectation that the SDK harness will
>>>>>> concurrently work its way deeply enough down the instruction queue to
>>>>>> unblock itself. That would be an extremely subtle requirement for SDK
>>>>>> authors and one practical question becomes: what should an SDK do
>>>>>> with a bundle instruction that it doesn't have capacity to execute? If
>>>>>> a runner needs to make such assumptions, I think that information should
>>>>>> probably rather be explicit along the lines of proposal 1 -- i.e., some
>>>>>> kind of negotiation between resources allotted to the SDK harness (a
>>>>>> preliminary variant are in the provisioning api) and what the SDK harness
>>>>>> in return can do (and a valid answer might be: 1 bundle at a time
>>>>>> irrespectively of resources given) or a per-bundle special "overloaded"
>>>>>> error response. For other aspects, such as side input readiness, the 
>>>>>> runner
>>>>>> handles that complexity and the overall bias has generally been to move
>>>>>> complexity to the runner side.
>>>>>>
>>>>>> The SDK harness and initialization overhead is entirely SDK, job type
>>>>>> and even pipeline specific. A docker container is also just a process, 
>>>>>> btw,
>>>>>> and doesn't inherently carry much overhead. That said, on a single host, 
>>>>>> a
>>>>>> static docker configuration is generally a lot simpler to work with.
>>>>>>
>>>>>> Henning
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <t...@apache.org> wrote:
>>>>>>
>>>>>>> It is good to see this discussed!
>>>>>>>
>>>>>>> I think there needs to be a good balance between the SDK harness
>>>>>>> capabilities/complexity and responsibilities. Additionally the user will
>>>>>>> need to be able to adjust the runner behavior, since the type of 
>>>>>>> workload
>>>>>>> executed in the harness also is a factor. Elsewhere we already discussed
>>>>>>> that the current assumption of a single SDK harness instance per Flink 
>>>>>>> task
>>>>>>> manager brings problems with it and that there needs to be more than one
>>>>>>> way how the runner can spin up SDK harnesses.
>>>>>>>
>>>>>>> There was the concern that instantiation if multiple SDK harnesses
>>>>>>> per TM host is expensive (resource usage, initialization time etc.). 
>>>>>>> That
>>>>>>> may hold true for a specific scenario, such as batch workloads and the 
>>>>>>> use
>>>>>>> of Docker containers. But it may look totally different for a streaming
>>>>>>> topology or when SDK harness is just a process on the same host.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> SDK harnesses were always responsible for executing all work given
>>>>>>>> to it concurrently. Runners have been responsible for choosing how much
>>>>>>>> work to give to the SDK harness in such a way that best utilizes the 
>>>>>>>> SDK
>>>>>>>> harness.
>>>>>>>>
>>>>>>>> I understand that multithreading in python is inefficient due to
>>>>>>>> the global interpreter lock, it would be upto the runner in this case 
>>>>>>>> to
>>>>>>>> make sure that the amount of work it gives to each SDK harness best
>>>>>>>> utilizes it while spinning up an appropriate number of SDK harnesses.
>>>>>>>>
>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels <m...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Ankur,
>>>>>>>>>
>>>>>>>>> Thanks for looking into this problem. The cause seems to be Flink's
>>>>>>>>> pipelined execution mode. It runs multiple tasks in one task slot
>>>>>>>>> and
>>>>>>>>> produces a deadlock when the pipelined operators schedule the SDK
>>>>>>>>> harness DoFns in non-topological order.
>>>>>>>>>
>>>>>>>>> The problem would be resolved if we scheduled the tasks in
>>>>>>>>> topological
>>>>>>>>> order. Doing that is not easy because they run in separate Flink
>>>>>>>>> operators and the SDK Harness would have to have insight into the
>>>>>>>>> execution graph (which is not desirable).
>>>>>>>>>
>>>>>>>>> The easiest method, which you proposed in 1) is to ensure that the
>>>>>>>>> number of threads in the SDK harness matches the number of
>>>>>>>>> ExecutableStage DoFn operators.
>>>>>>>>>
>>>>>>>>> The approach in 2) is what Flink does as well. It glues together
>>>>>>>>> horizontal parts of the execution graph, also in multiple threads.
>>>>>>>>> So I
>>>>>>>>> agree with your proposed solution.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Max
>>>>>>>>>
>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote:
>>>>>>>>> > Hi,
>>>>>>>>> >
>>>>>>>>> > tl;dr Dead Lock in task execution caused by limited task
>>>>>>>>> parallelism on
>>>>>>>>> > SDKHarness.
>>>>>>>>> >
>>>>>>>>> > *Setup:*
>>>>>>>>> >
>>>>>>>>> >   * Job type: /*Beam Portable Python Batch*/ Job on Flink
>>>>>>>>> standalone
>>>>>>>>> >     cluster.
>>>>>>>>> >   * Only a single job is scheduled on the cluster.
>>>>>>>>> >   * Everything is running on a single machine with single Flink
>>>>>>>>> task
>>>>>>>>> >     manager.
>>>>>>>>> >   * Flink Task Manager Slots is 1.
>>>>>>>>> >   * Flink Parallelism is 1.
>>>>>>>>> >   * Python SDKHarness has 1 thread.
>>>>>>>>> >
>>>>>>>>> > *Example pipeline:*
>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink
>>>>>>>>> >
>>>>>>>>> > *Issue:*
>>>>>>>>> > With multi stage job, Flink schedule different dependent sub
>>>>>>>>> tasks
>>>>>>>>> > concurrently on Flink worker as long as it can get slots. Each
>>>>>>>>> map tasks
>>>>>>>>> > are then executed on SDKHarness.
>>>>>>>>> > Its possible that MapB gets to SDKHarness before MapA and hence
>>>>>>>>> gets
>>>>>>>>> > into the execution queue before MapA. Because we only have 1
>>>>>>>>> execution
>>>>>>>>> > thread on SDKHarness, MapA will never get a chance to execute as
>>>>>>>>> MapB
>>>>>>>>> > will never release the execution thread. MapB will wait for
>>>>>>>>> input from
>>>>>>>>> > MapA. This gets us to a dead lock in a simple pipeline.
>>>>>>>>> >
>>>>>>>>> > *Mitigation:*
>>>>>>>>> > Set worker_count in pipeline options more than the expected sub
>>>>>>>>> tasks
>>>>>>>>> > in pipeline.
>>>>>>>>> >
>>>>>>>>> > *Proposal:*
>>>>>>>>> >
>>>>>>>>> >  1. We can get the maximum concurrency from the runner and make
>>>>>>>>> sure
>>>>>>>>> >     that we have more threads than max concurrency. This approach
>>>>>>>>> >     assumes that Beam has insight into runner execution plan and
>>>>>>>>> can
>>>>>>>>> >     make decision based on it.
>>>>>>>>> >  2. We dynamically create thread and cache them with a high
>>>>>>>>> upper bound
>>>>>>>>> >     in SDKHarness. We can warn if we are hitting the upper bound
>>>>>>>>> of
>>>>>>>>> >     threads. This approach assumes that runner does a good job of
>>>>>>>>> >     scheduling and will distribute tasks more or less evenly.
>>>>>>>>> >
>>>>>>>>> > We expect good scheduling from runners so I prefer approach 2.
>>>>>>>>> It is
>>>>>>>>> > simpler to implement and the implementation is not runner
>>>>>>>>> specific. This
>>>>>>>>> > approach better utilize resource as it creates only as many
>>>>>>>>> threads as
>>>>>>>>> > needed instead of the peak thread requirement.
>>>>>>>>> > And last but not the least, it gives runner control over
>>>>>>>>> managing truly
>>>>>>>>> > active tasks.
>>>>>>>>> >
>>>>>>>>> > Please let me know if I am missing something and your thoughts
>>>>>>>>> on the
>>>>>>>>> > approach.
>>>>>>>>> >
>>>>>>>>> > Thanks,
>>>>>>>>> > Ankur
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>

Reply via email to