Ankur, how would you expect an SDK to compute a realistic upper bound (upfront or during pipeline computation)?
First thought that came to my mind was that the SDK would provide CPU/memory/... resourcing information and the runner making a judgement call as to whether it should ask the SDK to do more work or less but its not an explicit don't do more then X bundles in parallel. On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <goe...@google.com> wrote: > Makes sense. Having exposed upper bound on concurrency with optimum > concurrency can give a good balance. This is good information to expose > while keeping the requirements from the SDK simple. SDK can publish 1 as > the optimum concurrency and upper bound to keep things simple. > > Runner introspection of upper bound on concurrency is important for > correctness while introspection of optimum concurrency is important for > efficiency. This separates efficiency and correctness requirements. > > On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde <hero...@google.com> wrote: > >> I agree with Luke's observation, with the caveat that "infinite amount >> of bundles in parallel" is limited by the available resources. For example, >> the Go SDK harness will accept an arbitrary amount of parallel work, but >> too much work will cause either excessive GC pressure with crippling >> slowness or an outright OOM. Unless it's always 1, a reasonable upper bound >> will either have to be provided by the user or computed from the mem/cpu >> resources given. Of course, as some bundles takes more resources than >> others, so any static value will be an estimate or ignore resource limits. >> >> That said, I do not like that an "efficiency" aspect becomes a subtle >> requirement for correctness due to Flink internals. I fear that road leads >> to trouble. >> >> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka <goe...@google.com> wrote: >> >>> The later case of having a of supporting single bundle execution at a >>> time on SDK and runner not using this flag is exactly the reason we got >>> into the Dead Lock here. >>> I agree with exposing SDK optimum concurrency level ( 1 in later case ) >>> and let runner decide to use it or not. But at the same time expect SDK to >>> handle infinite amount of bundles even if its not efficient. >>> >>> Thanks, >>> Ankur >>> >>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik <lc...@google.com> wrote: >>> >>>> I believe in practice SDK harnesses will fall into one of two >>>> capabilities, can process effectively an infinite amount of bundles in >>>> parallel or can only process a single bundle at a time. >>>> >>>> I believe it is more difficult for a runner to handle the latter case >>>> well and to perform all the environment management that would make that >>>> efficient. It may be inefficient for an SDK but I do believe it should be >>>> able to say that I'm not great at anything more then a single bundle at a >>>> time but utilizing this information by a runner should be optional. >>>> >>>> >>>> >>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <goe...@google.com> wrote: >>>> >>>>> To recap the discussion it seems that we have come-up with following >>>>> point. >>>>> SDKHarness Management and initialization. >>>>> >>>>> 1. Runner completely own the work assignment to SDKHarness. >>>>> 2. Runner should know the capabilities and capacity of SDKHarness >>>>> and should assign work accordingly. >>>>> 3. Spinning up of SDKHarness is runner's responsibility and it can >>>>> be done statically (a fixed pre configured number of SDKHarness) or >>>>> dynamically or based on certain other configuration/logic which runner >>>>> choose. >>>>> >>>>> SDKHarness Expectation. This is more in question and we should outline >>>>> the responsibility of SDKHarness. >>>>> >>>>> 1. SDKHarness should publish how many concurrent tasks it can >>>>> execute. >>>>> 2. SDKHarness should start executing all the tasks items assigned >>>>> in parallel in a timely manner or fail task. >>>>> >>>>> Also to add to simplification side. I think for better adoption, we >>>>> should have simple SDKHarness as well as simple Runner integration to >>>>> encourage integration with more runner. Also many runners might not expose >>>>> some of the internal scheduling characteristics so we should not expect >>>>> scheduling characteristics for runner integration. Moreover scheduling >>>>> characteristics can change based on pipeline type, infrastructure, >>>>> available resource etc. So I am a bit hesitant to add runner scheduling >>>>> specifics for runner integration. >>>>> A good balance between SDKHarness complexity and Runner integration >>>>> can be helpful in easier adoption. >>>>> >>>>> Thanks, >>>>> Ankur >>>>> >>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde <hero...@google.com> >>>>> wrote: >>>>> >>>>>> Finding a good balance is indeed the art of portability, because the >>>>>> range of capability (and assumptions) on both sides is wide. >>>>>> >>>>>> It was originally the idea to allow the SDK harness to be an >>>>>> extremely simple bundle executer (specifically, single-threaded execution >>>>>> one instruction at a time) however inefficient -- a more sophisticated >>>>>> SDK >>>>>> harness would support more features and be more efficient. For the issue >>>>>> described here, it seems problematic to me to send non-executable bundles >>>>>> to the SDK harness under the expectation that the SDK harness will >>>>>> concurrently work its way deeply enough down the instruction queue to >>>>>> unblock itself. That would be an extremely subtle requirement for SDK >>>>>> authors and one practical question becomes: what should an SDK do >>>>>> with a bundle instruction that it doesn't have capacity to execute? If >>>>>> a runner needs to make such assumptions, I think that information should >>>>>> probably rather be explicit along the lines of proposal 1 -- i.e., some >>>>>> kind of negotiation between resources allotted to the SDK harness (a >>>>>> preliminary variant are in the provisioning api) and what the SDK harness >>>>>> in return can do (and a valid answer might be: 1 bundle at a time >>>>>> irrespectively of resources given) or a per-bundle special "overloaded" >>>>>> error response. For other aspects, such as side input readiness, the >>>>>> runner >>>>>> handles that complexity and the overall bias has generally been to move >>>>>> complexity to the runner side. >>>>>> >>>>>> The SDK harness and initialization overhead is entirely SDK, job type >>>>>> and even pipeline specific. A docker container is also just a process, >>>>>> btw, >>>>>> and doesn't inherently carry much overhead. That said, on a single host, >>>>>> a >>>>>> static docker configuration is generally a lot simpler to work with. >>>>>> >>>>>> Henning >>>>>> >>>>>> >>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <t...@apache.org> wrote: >>>>>> >>>>>>> It is good to see this discussed! >>>>>>> >>>>>>> I think there needs to be a good balance between the SDK harness >>>>>>> capabilities/complexity and responsibilities. Additionally the user will >>>>>>> need to be able to adjust the runner behavior, since the type of >>>>>>> workload >>>>>>> executed in the harness also is a factor. Elsewhere we already discussed >>>>>>> that the current assumption of a single SDK harness instance per Flink >>>>>>> task >>>>>>> manager brings problems with it and that there needs to be more than one >>>>>>> way how the runner can spin up SDK harnesses. >>>>>>> >>>>>>> There was the concern that instantiation if multiple SDK harnesses >>>>>>> per TM host is expensive (resource usage, initialization time etc.). >>>>>>> That >>>>>>> may hold true for a specific scenario, such as batch workloads and the >>>>>>> use >>>>>>> of Docker containers. But it may look totally different for a streaming >>>>>>> topology or when SDK harness is just a process on the same host. >>>>>>> >>>>>>> Thanks, >>>>>>> Thomas >>>>>>> >>>>>>> >>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <lc...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> SDK harnesses were always responsible for executing all work given >>>>>>>> to it concurrently. Runners have been responsible for choosing how much >>>>>>>> work to give to the SDK harness in such a way that best utilizes the >>>>>>>> SDK >>>>>>>> harness. >>>>>>>> >>>>>>>> I understand that multithreading in python is inefficient due to >>>>>>>> the global interpreter lock, it would be upto the runner in this case >>>>>>>> to >>>>>>>> make sure that the amount of work it gives to each SDK harness best >>>>>>>> utilizes it while spinning up an appropriate number of SDK harnesses. >>>>>>>> >>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels <m...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Ankur, >>>>>>>>> >>>>>>>>> Thanks for looking into this problem. The cause seems to be Flink's >>>>>>>>> pipelined execution mode. It runs multiple tasks in one task slot >>>>>>>>> and >>>>>>>>> produces a deadlock when the pipelined operators schedule the SDK >>>>>>>>> harness DoFns in non-topological order. >>>>>>>>> >>>>>>>>> The problem would be resolved if we scheduled the tasks in >>>>>>>>> topological >>>>>>>>> order. Doing that is not easy because they run in separate Flink >>>>>>>>> operators and the SDK Harness would have to have insight into the >>>>>>>>> execution graph (which is not desirable). >>>>>>>>> >>>>>>>>> The easiest method, which you proposed in 1) is to ensure that the >>>>>>>>> number of threads in the SDK harness matches the number of >>>>>>>>> ExecutableStage DoFn operators. >>>>>>>>> >>>>>>>>> The approach in 2) is what Flink does as well. It glues together >>>>>>>>> horizontal parts of the execution graph, also in multiple threads. >>>>>>>>> So I >>>>>>>>> agree with your proposed solution. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Max >>>>>>>>> >>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote: >>>>>>>>> > Hi, >>>>>>>>> > >>>>>>>>> > tl;dr Dead Lock in task execution caused by limited task >>>>>>>>> parallelism on >>>>>>>>> > SDKHarness. >>>>>>>>> > >>>>>>>>> > *Setup:* >>>>>>>>> > >>>>>>>>> > * Job type: /*Beam Portable Python Batch*/ Job on Flink >>>>>>>>> standalone >>>>>>>>> > cluster. >>>>>>>>> > * Only a single job is scheduled on the cluster. >>>>>>>>> > * Everything is running on a single machine with single Flink >>>>>>>>> task >>>>>>>>> > manager. >>>>>>>>> > * Flink Task Manager Slots is 1. >>>>>>>>> > * Flink Parallelism is 1. >>>>>>>>> > * Python SDKHarness has 1 thread. >>>>>>>>> > >>>>>>>>> > *Example pipeline:* >>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink >>>>>>>>> > >>>>>>>>> > *Issue:* >>>>>>>>> > With multi stage job, Flink schedule different dependent sub >>>>>>>>> tasks >>>>>>>>> > concurrently on Flink worker as long as it can get slots. Each >>>>>>>>> map tasks >>>>>>>>> > are then executed on SDKHarness. >>>>>>>>> > Its possible that MapB gets to SDKHarness before MapA and hence >>>>>>>>> gets >>>>>>>>> > into the execution queue before MapA. Because we only have 1 >>>>>>>>> execution >>>>>>>>> > thread on SDKHarness, MapA will never get a chance to execute as >>>>>>>>> MapB >>>>>>>>> > will never release the execution thread. MapB will wait for >>>>>>>>> input from >>>>>>>>> > MapA. This gets us to a dead lock in a simple pipeline. >>>>>>>>> > >>>>>>>>> > *Mitigation:* >>>>>>>>> > Set worker_count in pipeline options more than the expected sub >>>>>>>>> tasks >>>>>>>>> > in pipeline. >>>>>>>>> > >>>>>>>>> > *Proposal:* >>>>>>>>> > >>>>>>>>> > 1. We can get the maximum concurrency from the runner and make >>>>>>>>> sure >>>>>>>>> > that we have more threads than max concurrency. This approach >>>>>>>>> > assumes that Beam has insight into runner execution plan and >>>>>>>>> can >>>>>>>>> > make decision based on it. >>>>>>>>> > 2. We dynamically create thread and cache them with a high >>>>>>>>> upper bound >>>>>>>>> > in SDKHarness. We can warn if we are hitting the upper bound >>>>>>>>> of >>>>>>>>> > threads. This approach assumes that runner does a good job of >>>>>>>>> > scheduling and will distribute tasks more or less evenly. >>>>>>>>> > >>>>>>>>> > We expect good scheduling from runners so I prefer approach 2. >>>>>>>>> It is >>>>>>>>> > simpler to implement and the implementation is not runner >>>>>>>>> specific. This >>>>>>>>> > approach better utilize resource as it creates only as many >>>>>>>>> threads as >>>>>>>>> > needed instead of the peak thread requirement. >>>>>>>>> > And last but not the least, it gives runner control over >>>>>>>>> managing truly >>>>>>>>> > active tasks. >>>>>>>>> > >>>>>>>>> > Please let me know if I am missing something and your thoughts >>>>>>>>> on the >>>>>>>>> > approach. >>>>>>>>> > >>>>>>>>> > Thanks, >>>>>>>>> > Ankur >>>>>>>>> > >>>>>>>>> >>>>>>>>