Sounds good to me. GRPC Header of the control channel seems to be a good place to add upper bound information. Added jiras: https://issues.apache.org/jira/browse/BEAM-5166 https://issues.apache.org/jira/browse/BEAM-5167
On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde <hero...@google.com> wrote: > Regarding resources: the runner can currently dictate the mem/cpu/disk > resources that the harness is allowed to use via the provisioning api. The > SDK harness need not -- and should not -- speculate on what else might be > running on the machine: > > > https://github.com/apache/beam/blob/0e14965707b5d48a3de7fa69f09d88ef0aa48c09/model/fn-execution/src/main/proto/beam_provision_api.proto#L69 > > A realistic startup-time computation in the SDK harness would be something > simple like: max(1, min(cpu*100, mem_mb/10)) say, and use that at most > number of threads. Or just hardcode to 300. Or a user-provided value. > Whatever the value is the maximum number of bundles in flight allowed at > any given time and needs to be communicated to the runner via some message. > Anything beyond would be rejected (but this shouldn't happen, because the > runner should respect that number). > > A dynamic computation would use the same limits from the SDK, but take > into account its own resource usage (incl. the usage by running bundles). > > On Fri, Aug 17, 2018 at 6:20 PM Ankur Goenka <goe...@google.com> wrote: > >> I am thinking upper bound to be more on the lines of theocratical upper >> limit or any other static high value beyond which the SDK will reject >> bundle verbosely. The idea is that SDK will not keep bundles in queue while >> waiting on current bundles to finish. It will simply reject any additional >> bundle. >> Beyond this I don't have a good answer to dynamic upper bound. As SDK >> does not have the complete picture of processes on the machine with which >> it share resources, resources might not be a good proxy for upper bound >> from the SDK point of view. >> >> On Fri, Aug 17, 2018 at 6:01 PM Lukasz Cwik <lc...@google.com> wrote: >> >>> Ankur, how would you expect an SDK to compute a realistic upper bound >>> (upfront or during pipeline computation)? >>> >>> First thought that came to my mind was that the SDK would provide >>> CPU/memory/... resourcing information and the runner making a judgement >>> call as to whether it should ask the SDK to do more work or less but its >>> not an explicit don't do more then X bundles in parallel. >>> >>> On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <goe...@google.com> wrote: >>> >>>> Makes sense. Having exposed upper bound on concurrency with optimum >>>> concurrency can give a good balance. This is good information to expose >>>> while keeping the requirements from the SDK simple. SDK can publish 1 as >>>> the optimum concurrency and upper bound to keep things simple. >>>> >>>> Runner introspection of upper bound on concurrency is important for >>>> correctness while introspection of optimum concurrency is important for >>>> efficiency. This separates efficiency and correctness requirements. >>>> >>>> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde <hero...@google.com> >>>> wrote: >>>> >>>>> I agree with Luke's observation, with the caveat that "infinite >>>>> amount of bundles in parallel" is limited by the available resources. For >>>>> example, the Go SDK harness will accept an arbitrary amount of parallel >>>>> work, but too much work will cause either excessive GC pressure with >>>>> crippling slowness or an outright OOM. Unless it's always 1, a reasonable >>>>> upper bound will either have to be provided by the user or computed from >>>>> the mem/cpu resources given. Of course, as some bundles takes more >>>>> resources than others, so any static value will be an estimate or ignore >>>>> resource limits. >>>>> >>>>> That said, I do not like that an "efficiency" aspect becomes a subtle >>>>> requirement for correctness due to Flink internals. I fear that road leads >>>>> to trouble. >>>>> >>>>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka <goe...@google.com> >>>>> wrote: >>>>> >>>>>> The later case of having a of supporting single bundle execution at a >>>>>> time on SDK and runner not using this flag is exactly the reason we got >>>>>> into the Dead Lock here. >>>>>> I agree with exposing SDK optimum concurrency level ( 1 in later case >>>>>> ) and let runner decide to use it or not. But at the same time expect SDK >>>>>> to handle infinite amount of bundles even if its not efficient. >>>>>> >>>>>> Thanks, >>>>>> Ankur >>>>>> >>>>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik <lc...@google.com> wrote: >>>>>> >>>>>>> I believe in practice SDK harnesses will fall into one of two >>>>>>> capabilities, can process effectively an infinite amount of bundles in >>>>>>> parallel or can only process a single bundle at a time. >>>>>>> >>>>>>> I believe it is more difficult for a runner to handle the latter >>>>>>> case well and to perform all the environment management that would make >>>>>>> that efficient. It may be inefficient for an SDK but I do believe it >>>>>>> should >>>>>>> be able to say that I'm not great at anything more then a single bundle >>>>>>> at >>>>>>> a time but utilizing this information by a runner should be optional. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <goe...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> To recap the discussion it seems that we have come-up with >>>>>>>> following point. >>>>>>>> SDKHarness Management and initialization. >>>>>>>> >>>>>>>> 1. Runner completely own the work assignment to SDKHarness. >>>>>>>> 2. Runner should know the capabilities and capacity of >>>>>>>> SDKHarness and should assign work accordingly. >>>>>>>> 3. Spinning up of SDKHarness is runner's responsibility and it >>>>>>>> can be done statically (a fixed pre configured number of >>>>>>>> SDKHarness) or >>>>>>>> dynamically or based on certain other configuration/logic which >>>>>>>> runner >>>>>>>> choose. >>>>>>>> >>>>>>>> SDKHarness Expectation. This is more in question and we should >>>>>>>> outline the responsibility of SDKHarness. >>>>>>>> >>>>>>>> 1. SDKHarness should publish how many concurrent tasks it can >>>>>>>> execute. >>>>>>>> 2. SDKHarness should start executing all the tasks items >>>>>>>> assigned in parallel in a timely manner or fail task. >>>>>>>> >>>>>>>> Also to add to simplification side. I think for better adoption, we >>>>>>>> should have simple SDKHarness as well as simple Runner integration to >>>>>>>> encourage integration with more runner. Also many runners might not >>>>>>>> expose >>>>>>>> some of the internal scheduling characteristics so we should not expect >>>>>>>> scheduling characteristics for runner integration. Moreover scheduling >>>>>>>> characteristics can change based on pipeline type, infrastructure, >>>>>>>> available resource etc. So I am a bit hesitant to add runner scheduling >>>>>>>> specifics for runner integration. >>>>>>>> A good balance between SDKHarness complexity and Runner integration >>>>>>>> can be helpful in easier adoption. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Ankur >>>>>>>> >>>>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde <hero...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Finding a good balance is indeed the art of portability, because >>>>>>>>> the range of capability (and assumptions) on both sides is wide. >>>>>>>>> >>>>>>>>> It was originally the idea to allow the SDK harness to be an >>>>>>>>> extremely simple bundle executer (specifically, single-threaded >>>>>>>>> execution >>>>>>>>> one instruction at a time) however inefficient -- a more >>>>>>>>> sophisticated SDK >>>>>>>>> harness would support more features and be more efficient. For the >>>>>>>>> issue >>>>>>>>> described here, it seems problematic to me to send non-executable >>>>>>>>> bundles >>>>>>>>> to the SDK harness under the expectation that the SDK harness will >>>>>>>>> concurrently work its way deeply enough down the instruction queue to >>>>>>>>> unblock itself. That would be an extremely subtle requirement for SDK >>>>>>>>> authors and one practical question becomes: what should an SDK do >>>>>>>>> with a bundle instruction that it doesn't have capacity to execute? If >>>>>>>>> a runner needs to make such assumptions, I think that information >>>>>>>>> should >>>>>>>>> probably rather be explicit along the lines of proposal 1 -- i.e., >>>>>>>>> some >>>>>>>>> kind of negotiation between resources allotted to the SDK harness (a >>>>>>>>> preliminary variant are in the provisioning api) and what the SDK >>>>>>>>> harness >>>>>>>>> in return can do (and a valid answer might be: 1 bundle at a time >>>>>>>>> irrespectively of resources given) or a per-bundle special >>>>>>>>> "overloaded" >>>>>>>>> error response. For other aspects, such as side input readiness, the >>>>>>>>> runner >>>>>>>>> handles that complexity and the overall bias has generally been to >>>>>>>>> move >>>>>>>>> complexity to the runner side. >>>>>>>>> >>>>>>>>> The SDK harness and initialization overhead is entirely SDK, job >>>>>>>>> type and even pipeline specific. A docker container is also just a >>>>>>>>> process, >>>>>>>>> btw, and doesn't inherently carry much overhead. That said, on a >>>>>>>>> single >>>>>>>>> host, a static docker configuration is generally a lot simpler to >>>>>>>>> work with. >>>>>>>>> >>>>>>>>> Henning >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <t...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> It is good to see this discussed! >>>>>>>>>> >>>>>>>>>> I think there needs to be a good balance between the SDK harness >>>>>>>>>> capabilities/complexity and responsibilities. Additionally the user >>>>>>>>>> will >>>>>>>>>> need to be able to adjust the runner behavior, since the type of >>>>>>>>>> workload >>>>>>>>>> executed in the harness also is a factor. Elsewhere we already >>>>>>>>>> discussed >>>>>>>>>> that the current assumption of a single SDK harness instance per >>>>>>>>>> Flink task >>>>>>>>>> manager brings problems with it and that there needs to be more than >>>>>>>>>> one >>>>>>>>>> way how the runner can spin up SDK harnesses. >>>>>>>>>> >>>>>>>>>> There was the concern that instantiation if multiple SDK >>>>>>>>>> harnesses per TM host is expensive (resource usage, initialization >>>>>>>>>> time >>>>>>>>>> etc.). That may hold true for a specific scenario, such as batch >>>>>>>>>> workloads >>>>>>>>>> and the use of Docker containers. But it may look totally different >>>>>>>>>> for a >>>>>>>>>> streaming topology or when SDK harness is just a process on the same >>>>>>>>>> host. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Thomas >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <lc...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> SDK harnesses were always responsible for executing all work >>>>>>>>>>> given to it concurrently. Runners have been responsible for >>>>>>>>>>> choosing how >>>>>>>>>>> much work to give to the SDK harness in such a way that best >>>>>>>>>>> utilizes the >>>>>>>>>>> SDK harness. >>>>>>>>>>> >>>>>>>>>>> I understand that multithreading in python is inefficient due to >>>>>>>>>>> the global interpreter lock, it would be upto the runner in this >>>>>>>>>>> case to >>>>>>>>>>> make sure that the amount of work it gives to each SDK harness best >>>>>>>>>>> utilizes it while spinning up an appropriate number of SDK >>>>>>>>>>> harnesses. >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels < >>>>>>>>>>> m...@apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Ankur, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for looking into this problem. The cause seems to be >>>>>>>>>>>> Flink's >>>>>>>>>>>> pipelined execution mode. It runs multiple tasks in one task >>>>>>>>>>>> slot and >>>>>>>>>>>> produces a deadlock when the pipelined operators schedule the >>>>>>>>>>>> SDK >>>>>>>>>>>> harness DoFns in non-topological order. >>>>>>>>>>>> >>>>>>>>>>>> The problem would be resolved if we scheduled the tasks in >>>>>>>>>>>> topological >>>>>>>>>>>> order. Doing that is not easy because they run in separate Flink >>>>>>>>>>>> operators and the SDK Harness would have to have insight into >>>>>>>>>>>> the >>>>>>>>>>>> execution graph (which is not desirable). >>>>>>>>>>>> >>>>>>>>>>>> The easiest method, which you proposed in 1) is to ensure that >>>>>>>>>>>> the >>>>>>>>>>>> number of threads in the SDK harness matches the number of >>>>>>>>>>>> ExecutableStage DoFn operators. >>>>>>>>>>>> >>>>>>>>>>>> The approach in 2) is what Flink does as well. It glues together >>>>>>>>>>>> horizontal parts of the execution graph, also in multiple >>>>>>>>>>>> threads. So I >>>>>>>>>>>> agree with your proposed solution. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Max >>>>>>>>>>>> >>>>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote: >>>>>>>>>>>> > Hi, >>>>>>>>>>>> > >>>>>>>>>>>> > tl;dr Dead Lock in task execution caused by limited task >>>>>>>>>>>> parallelism on >>>>>>>>>>>> > SDKHarness. >>>>>>>>>>>> > >>>>>>>>>>>> > *Setup:* >>>>>>>>>>>> > >>>>>>>>>>>> > * Job type: /*Beam Portable Python Batch*/ Job on Flink >>>>>>>>>>>> standalone >>>>>>>>>>>> > cluster. >>>>>>>>>>>> > * Only a single job is scheduled on the cluster. >>>>>>>>>>>> > * Everything is running on a single machine with single >>>>>>>>>>>> Flink task >>>>>>>>>>>> > manager. >>>>>>>>>>>> > * Flink Task Manager Slots is 1. >>>>>>>>>>>> > * Flink Parallelism is 1. >>>>>>>>>>>> > * Python SDKHarness has 1 thread. >>>>>>>>>>>> > >>>>>>>>>>>> > *Example pipeline:* >>>>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink >>>>>>>>>>>> > >>>>>>>>>>>> > *Issue:* >>>>>>>>>>>> > With multi stage job, Flink schedule different dependent sub >>>>>>>>>>>> tasks >>>>>>>>>>>> > concurrently on Flink worker as long as it can get slots. >>>>>>>>>>>> Each map tasks >>>>>>>>>>>> > are then executed on SDKHarness. >>>>>>>>>>>> > Its possible that MapB gets to SDKHarness before MapA and >>>>>>>>>>>> hence gets >>>>>>>>>>>> > into the execution queue before MapA. Because we only have 1 >>>>>>>>>>>> execution >>>>>>>>>>>> > thread on SDKHarness, MapA will never get a chance to execute >>>>>>>>>>>> as MapB >>>>>>>>>>>> > will never release the execution thread. MapB will wait for >>>>>>>>>>>> input from >>>>>>>>>>>> > MapA. This gets us to a dead lock in a simple pipeline. >>>>>>>>>>>> > >>>>>>>>>>>> > *Mitigation:* >>>>>>>>>>>> > Set worker_count in pipeline options more than the expected >>>>>>>>>>>> sub tasks >>>>>>>>>>>> > in pipeline. >>>>>>>>>>>> > >>>>>>>>>>>> > *Proposal:* >>>>>>>>>>>> > >>>>>>>>>>>> > 1. We can get the maximum concurrency from the runner and >>>>>>>>>>>> make sure >>>>>>>>>>>> > that we have more threads than max concurrency. This >>>>>>>>>>>> approach >>>>>>>>>>>> > assumes that Beam has insight into runner execution plan >>>>>>>>>>>> and can >>>>>>>>>>>> > make decision based on it. >>>>>>>>>>>> > 2. We dynamically create thread and cache them with a high >>>>>>>>>>>> upper bound >>>>>>>>>>>> > in SDKHarness. We can warn if we are hitting the upper >>>>>>>>>>>> bound of >>>>>>>>>>>> > threads. This approach assumes that runner does a good >>>>>>>>>>>> job of >>>>>>>>>>>> > scheduling and will distribute tasks more or less evenly. >>>>>>>>>>>> > >>>>>>>>>>>> > We expect good scheduling from runners so I prefer approach >>>>>>>>>>>> 2. It is >>>>>>>>>>>> > simpler to implement and the implementation is not runner >>>>>>>>>>>> specific. This >>>>>>>>>>>> > approach better utilize resource as it creates only as many >>>>>>>>>>>> threads as >>>>>>>>>>>> > needed instead of the peak thread requirement. >>>>>>>>>>>> > And last but not the least, it gives runner control over >>>>>>>>>>>> managing truly >>>>>>>>>>>> > active tasks. >>>>>>>>>>>> > >>>>>>>>>>>> > Please let me know if I am missing something and your >>>>>>>>>>>> thoughts on the >>>>>>>>>>>> > approach. >>>>>>>>>>>> > >>>>>>>>>>>> > Thanks, >>>>>>>>>>>> > Ankur >>>>>>>>>>>> > >>>>>>>>>>>> >>>>>>>>>>>