Sounds good to me.
GRPC Header of the control channel seems to be a good place to add upper
bound information.
Added jiras:
https://issues.apache.org/jira/browse/BEAM-5166
https://issues.apache.org/jira/browse/BEAM-5167

On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde <hero...@google.com> wrote:

> Regarding resources: the runner can currently dictate the mem/cpu/disk
> resources that the harness is allowed to use via the provisioning api. The
> SDK harness need not -- and should not -- speculate on what else might be
> running on the machine:
>
>
> https://github.com/apache/beam/blob/0e14965707b5d48a3de7fa69f09d88ef0aa48c09/model/fn-execution/src/main/proto/beam_provision_api.proto#L69
>
> A realistic startup-time computation in the SDK harness would be something
> simple like: max(1, min(cpu*100, mem_mb/10)) say, and use that at most
> number of threads. Or just hardcode to 300. Or a user-provided value.
> Whatever the value is the maximum number of bundles in flight allowed at
> any given time and needs to be communicated to the runner via some message.
> Anything beyond would be rejected (but this shouldn't happen, because the
> runner should respect that number).
>
> A dynamic computation would use the same limits from the SDK, but take
> into account its own resource usage (incl. the usage by running bundles).
>
> On Fri, Aug 17, 2018 at 6:20 PM Ankur Goenka <goe...@google.com> wrote:
>
>> I am thinking upper bound to be more on the lines of theocratical upper
>> limit or any other static high value beyond which the SDK will reject
>> bundle verbosely. The idea is that SDK will not keep bundles in queue while
>> waiting on current bundles to finish. It will simply reject any additional
>> bundle.
>> Beyond this I don't have a good answer to dynamic upper bound. As SDK
>> does not have the complete picture of processes on the machine with which
>> it share resources, resources might not be a good proxy for upper bound
>> from the SDK point of view.
>>
>> On Fri, Aug 17, 2018 at 6:01 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Ankur, how would you expect an SDK to compute a realistic upper bound
>>> (upfront or during pipeline computation)?
>>>
>>> First thought that came to my mind was that the SDK would provide
>>> CPU/memory/... resourcing information and the runner making a judgement
>>> call as to whether it should ask the SDK to do more work or less but its
>>> not an explicit don't do more then X bundles in parallel.
>>>
>>> On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <goe...@google.com> wrote:
>>>
>>>> Makes sense. Having exposed upper bound on concurrency with optimum
>>>> concurrency can give a good balance. This is good information to expose
>>>> while keeping the requirements from the SDK simple. SDK can publish 1 as
>>>> the optimum concurrency and upper bound to keep things simple.
>>>>
>>>> Runner introspection of upper bound on concurrency is important for
>>>> correctness while introspection of optimum concurrency is important for
>>>> efficiency. This separates efficiency and correctness requirements.
>>>>
>>>> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde <hero...@google.com>
>>>> wrote:
>>>>
>>>>> I agree with Luke's observation, with the caveat that "infinite
>>>>> amount of bundles in parallel" is limited by the available resources. For
>>>>> example, the Go SDK harness will accept an arbitrary amount of parallel
>>>>> work, but too much work will cause either excessive GC pressure with
>>>>> crippling slowness or an outright OOM. Unless it's always 1, a reasonable
>>>>> upper bound will either have to be provided by the user or computed from
>>>>> the mem/cpu resources given. Of course, as some bundles takes more
>>>>> resources than others, so any static value will be an estimate or ignore
>>>>> resource limits.
>>>>>
>>>>> That said, I do not like that an "efficiency" aspect becomes a subtle
>>>>> requirement for correctness due to Flink internals. I fear that road leads
>>>>> to trouble.
>>>>>
>>>>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka <goe...@google.com>
>>>>> wrote:
>>>>>
>>>>>> The later case of having a of supporting single bundle execution at a
>>>>>> time on SDK and runner not using this flag is exactly the reason we got
>>>>>> into the Dead Lock here.
>>>>>> I agree with exposing SDK optimum concurrency level ( 1 in later case
>>>>>> ) and let runner decide to use it or not. But at the same time expect SDK
>>>>>> to handle infinite amount of bundles even if its not efficient.
>>>>>>
>>>>>> Thanks,
>>>>>> Ankur
>>>>>>
>>>>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> I believe in practice SDK harnesses will fall into one of two
>>>>>>> capabilities, can process effectively an infinite amount of bundles in
>>>>>>> parallel or can only process a single bundle at a time.
>>>>>>>
>>>>>>> I believe it is more difficult for a runner to handle the latter
>>>>>>> case well and to perform all the environment management that would make
>>>>>>> that efficient. It may be inefficient for an SDK but I do believe it 
>>>>>>> should
>>>>>>> be able to say that I'm not great at anything more then a single bundle 
>>>>>>> at
>>>>>>> a time but utilizing this information by a runner should be optional.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <goe...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> To recap the discussion it seems that we have come-up with
>>>>>>>> following point.
>>>>>>>> SDKHarness Management and initialization.
>>>>>>>>
>>>>>>>>    1. Runner completely own the work assignment to SDKHarness.
>>>>>>>>    2. Runner should know the capabilities and capacity of
>>>>>>>>    SDKHarness and should assign work accordingly.
>>>>>>>>    3. Spinning up of SDKHarness is runner's responsibility and it
>>>>>>>>    can be done statically (a fixed pre configured number of 
>>>>>>>> SDKHarness) or
>>>>>>>>    dynamically or based on certain other configuration/logic which 
>>>>>>>> runner
>>>>>>>>    choose.
>>>>>>>>
>>>>>>>> SDKHarness Expectation. This is more in question and we should
>>>>>>>> outline the responsibility of SDKHarness.
>>>>>>>>
>>>>>>>>    1. SDKHarness should publish how many concurrent tasks it can
>>>>>>>>    execute.
>>>>>>>>    2. SDKHarness should start executing all the tasks items
>>>>>>>>    assigned in parallel in a timely manner or fail task.
>>>>>>>>
>>>>>>>> Also to add to simplification side. I think for better adoption, we
>>>>>>>> should have simple SDKHarness as well as simple Runner integration to
>>>>>>>> encourage integration with more runner. Also many runners might not 
>>>>>>>> expose
>>>>>>>> some of the internal scheduling characteristics so we should not expect
>>>>>>>> scheduling characteristics for runner integration. Moreover scheduling
>>>>>>>> characteristics can change based on pipeline type, infrastructure,
>>>>>>>> available resource etc. So I am a bit hesitant to add runner scheduling
>>>>>>>> specifics for runner integration.
>>>>>>>> A good balance between SDKHarness complexity and Runner integration
>>>>>>>> can be helpful in easier adoption.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Ankur
>>>>>>>>
>>>>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde <hero...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Finding a good balance is indeed the art of portability, because
>>>>>>>>> the range of capability (and assumptions) on both sides is wide.
>>>>>>>>>
>>>>>>>>> It was originally the idea to allow the SDK harness to be an
>>>>>>>>> extremely simple bundle executer (specifically, single-threaded 
>>>>>>>>> execution
>>>>>>>>> one instruction at a time) however inefficient -- a more 
>>>>>>>>> sophisticated SDK
>>>>>>>>> harness would support more features and be more efficient. For the 
>>>>>>>>> issue
>>>>>>>>> described here, it seems problematic to me to send non-executable 
>>>>>>>>> bundles
>>>>>>>>> to the SDK harness under the expectation that the SDK harness will
>>>>>>>>> concurrently work its way deeply enough down the instruction queue to
>>>>>>>>> unblock itself. That would be an extremely subtle requirement for SDK
>>>>>>>>> authors and one practical question becomes: what should an SDK do
>>>>>>>>> with a bundle instruction that it doesn't have capacity to execute? If
>>>>>>>>> a runner needs to make such assumptions, I think that information 
>>>>>>>>> should
>>>>>>>>> probably rather be explicit along the lines of proposal 1 -- i.e., 
>>>>>>>>> some
>>>>>>>>> kind of negotiation between resources allotted to the SDK harness (a
>>>>>>>>> preliminary variant are in the provisioning api) and what the SDK 
>>>>>>>>> harness
>>>>>>>>> in return can do (and a valid answer might be: 1 bundle at a time
>>>>>>>>> irrespectively of resources given) or a per-bundle special 
>>>>>>>>> "overloaded"
>>>>>>>>> error response. For other aspects, such as side input readiness, the 
>>>>>>>>> runner
>>>>>>>>> handles that complexity and the overall bias has generally been to 
>>>>>>>>> move
>>>>>>>>> complexity to the runner side.
>>>>>>>>>
>>>>>>>>> The SDK harness and initialization overhead is entirely SDK, job
>>>>>>>>> type and even pipeline specific. A docker container is also just a 
>>>>>>>>> process,
>>>>>>>>> btw, and doesn't inherently carry much overhead. That said, on a 
>>>>>>>>> single
>>>>>>>>> host, a static docker configuration is generally a lot simpler to 
>>>>>>>>> work with.
>>>>>>>>>
>>>>>>>>> Henning
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <t...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> It is good to see this discussed!
>>>>>>>>>>
>>>>>>>>>> I think there needs to be a good balance between the SDK harness
>>>>>>>>>> capabilities/complexity and responsibilities. Additionally the user 
>>>>>>>>>> will
>>>>>>>>>> need to be able to adjust the runner behavior, since the type of 
>>>>>>>>>> workload
>>>>>>>>>> executed in the harness also is a factor. Elsewhere we already 
>>>>>>>>>> discussed
>>>>>>>>>> that the current assumption of a single SDK harness instance per 
>>>>>>>>>> Flink task
>>>>>>>>>> manager brings problems with it and that there needs to be more than 
>>>>>>>>>> one
>>>>>>>>>> way how the runner can spin up SDK harnesses.
>>>>>>>>>>
>>>>>>>>>> There was the concern that instantiation if multiple SDK
>>>>>>>>>> harnesses per TM host is expensive (resource usage, initialization 
>>>>>>>>>> time
>>>>>>>>>> etc.). That may hold true for a specific scenario, such as batch 
>>>>>>>>>> workloads
>>>>>>>>>> and the use of Docker containers. But it may look totally different 
>>>>>>>>>> for a
>>>>>>>>>> streaming topology or when SDK harness is just a process on the same 
>>>>>>>>>> host.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Thomas
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <lc...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> SDK harnesses were always responsible for executing all work
>>>>>>>>>>> given to it concurrently. Runners have been responsible for 
>>>>>>>>>>> choosing how
>>>>>>>>>>> much work to give to the SDK harness in such a way that best 
>>>>>>>>>>> utilizes the
>>>>>>>>>>> SDK harness.
>>>>>>>>>>>
>>>>>>>>>>> I understand that multithreading in python is inefficient due to
>>>>>>>>>>> the global interpreter lock, it would be upto the runner in this 
>>>>>>>>>>> case to
>>>>>>>>>>> make sure that the amount of work it gives to each SDK harness best
>>>>>>>>>>> utilizes it while spinning up an appropriate number of SDK 
>>>>>>>>>>> harnesses.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels <
>>>>>>>>>>> m...@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Ankur,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for looking into this problem. The cause seems to be
>>>>>>>>>>>> Flink's
>>>>>>>>>>>> pipelined execution mode. It runs multiple tasks in one task
>>>>>>>>>>>> slot and
>>>>>>>>>>>> produces a deadlock when the pipelined operators schedule the
>>>>>>>>>>>> SDK
>>>>>>>>>>>> harness DoFns in non-topological order.
>>>>>>>>>>>>
>>>>>>>>>>>> The problem would be resolved if we scheduled the tasks in
>>>>>>>>>>>> topological
>>>>>>>>>>>> order. Doing that is not easy because they run in separate Flink
>>>>>>>>>>>> operators and the SDK Harness would have to have insight into
>>>>>>>>>>>> the
>>>>>>>>>>>> execution graph (which is not desirable).
>>>>>>>>>>>>
>>>>>>>>>>>> The easiest method, which you proposed in 1) is to ensure that
>>>>>>>>>>>> the
>>>>>>>>>>>> number of threads in the SDK harness matches the number of
>>>>>>>>>>>> ExecutableStage DoFn operators.
>>>>>>>>>>>>
>>>>>>>>>>>> The approach in 2) is what Flink does as well. It glues together
>>>>>>>>>>>> horizontal parts of the execution graph, also in multiple
>>>>>>>>>>>> threads. So I
>>>>>>>>>>>> agree with your proposed solution.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Max
>>>>>>>>>>>>
>>>>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote:
>>>>>>>>>>>> > Hi,
>>>>>>>>>>>> >
>>>>>>>>>>>> > tl;dr Dead Lock in task execution caused by limited task
>>>>>>>>>>>> parallelism on
>>>>>>>>>>>> > SDKHarness.
>>>>>>>>>>>> >
>>>>>>>>>>>> > *Setup:*
>>>>>>>>>>>> >
>>>>>>>>>>>> >   * Job type: /*Beam Portable Python Batch*/ Job on Flink
>>>>>>>>>>>> standalone
>>>>>>>>>>>> >     cluster.
>>>>>>>>>>>> >   * Only a single job is scheduled on the cluster.
>>>>>>>>>>>> >   * Everything is running on a single machine with single
>>>>>>>>>>>> Flink task
>>>>>>>>>>>> >     manager.
>>>>>>>>>>>> >   * Flink Task Manager Slots is 1.
>>>>>>>>>>>> >   * Flink Parallelism is 1.
>>>>>>>>>>>> >   * Python SDKHarness has 1 thread.
>>>>>>>>>>>> >
>>>>>>>>>>>> > *Example pipeline:*
>>>>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink
>>>>>>>>>>>> >
>>>>>>>>>>>> > *Issue:*
>>>>>>>>>>>> > With multi stage job, Flink schedule different dependent sub
>>>>>>>>>>>> tasks
>>>>>>>>>>>> > concurrently on Flink worker as long as it can get slots.
>>>>>>>>>>>> Each map tasks
>>>>>>>>>>>> > are then executed on SDKHarness.
>>>>>>>>>>>> > Its possible that MapB gets to SDKHarness before MapA and
>>>>>>>>>>>> hence gets
>>>>>>>>>>>> > into the execution queue before MapA. Because we only have 1
>>>>>>>>>>>> execution
>>>>>>>>>>>> > thread on SDKHarness, MapA will never get a chance to execute
>>>>>>>>>>>> as MapB
>>>>>>>>>>>> > will never release the execution thread. MapB will wait for
>>>>>>>>>>>> input from
>>>>>>>>>>>> > MapA. This gets us to a dead lock in a simple pipeline.
>>>>>>>>>>>> >
>>>>>>>>>>>> > *Mitigation:*
>>>>>>>>>>>> > Set worker_count in pipeline options more than the expected
>>>>>>>>>>>> sub tasks
>>>>>>>>>>>> > in pipeline.
>>>>>>>>>>>> >
>>>>>>>>>>>> > *Proposal:*
>>>>>>>>>>>> >
>>>>>>>>>>>> >  1. We can get the maximum concurrency from the runner and
>>>>>>>>>>>> make sure
>>>>>>>>>>>> >     that we have more threads than max concurrency. This
>>>>>>>>>>>> approach
>>>>>>>>>>>> >     assumes that Beam has insight into runner execution plan
>>>>>>>>>>>> and can
>>>>>>>>>>>> >     make decision based on it.
>>>>>>>>>>>> >  2. We dynamically create thread and cache them with a high
>>>>>>>>>>>> upper bound
>>>>>>>>>>>> >     in SDKHarness. We can warn if we are hitting the upper
>>>>>>>>>>>> bound of
>>>>>>>>>>>> >     threads. This approach assumes that runner does a good
>>>>>>>>>>>> job of
>>>>>>>>>>>> >     scheduling and will distribute tasks more or less evenly.
>>>>>>>>>>>> >
>>>>>>>>>>>> > We expect good scheduling from runners so I prefer approach
>>>>>>>>>>>> 2. It is
>>>>>>>>>>>> > simpler to implement and the implementation is not runner
>>>>>>>>>>>> specific. This
>>>>>>>>>>>> > approach better utilize resource as it creates only as many
>>>>>>>>>>>> threads as
>>>>>>>>>>>> > needed instead of the peak thread requirement.
>>>>>>>>>>>> > And last but not the least, it gives runner control over
>>>>>>>>>>>> managing truly
>>>>>>>>>>>> > active tasks.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Please let me know if I am missing something and your
>>>>>>>>>>>> thoughts on the
>>>>>>>>>>>> > approach.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Thanks,
>>>>>>>>>>>> > Ankur
>>>>>>>>>>>> >
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to