+1 on making the resources part of a proto. Based upon what Henning linked to, the provisioning API seems like an appropriate place to provide this information.
Thomas, I believe the environment proto is the best place to add information that a runner may want to know about upfront during pipeline pipeline creation. I wouldn't stick this into PipelineOptions for the long term. If you have time to capture these thoughts and update the environment proto, I would suggest going down that path. Otherwise anything short term like PipelineOptions will do. On Sun, Aug 19, 2018 at 5:41 PM Thomas Weise <[email protected]> wrote: > For SDKs where the upper limit is constant and known upfront, why not > communicate this along with the other harness resource info as part of the > job submission? > > Regarding use of GRPC headers: Why not make this explicit in the proto > instead? > > WRT runner dictating resource constraints: The runner actually may also > not have that information. It would need to be supplied as part of the > pipeline options? The cluster resource manager needs to allocate resources > for both, the runner and the SDK harness(es). > > Finally, what can be done to unblock the Flink runner / Python until > solution discussed here is in place? An extra runner option for SDK > singleton on/off? > > > On Sat, Aug 18, 2018 at 1:34 AM Ankur Goenka <[email protected]> wrote: > >> Sounds good to me. >> GRPC Header of the control channel seems to be a good place to add upper >> bound information. >> Added jiras: >> https://issues.apache.org/jira/browse/BEAM-5166 >> https://issues.apache.org/jira/browse/BEAM-5167 >> >> On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde <[email protected]> >> wrote: >> >>> Regarding resources: the runner can currently dictate the mem/cpu/disk >>> resources that the harness is allowed to use via the provisioning api. The >>> SDK harness need not -- and should not -- speculate on what else might be >>> running on the machine: >>> >>> >>> https://github.com/apache/beam/blob/0e14965707b5d48a3de7fa69f09d88ef0aa48c09/model/fn-execution/src/main/proto/beam_provision_api.proto#L69 >>> >>> A realistic startup-time computation in the SDK harness would be >>> something simple like: max(1, min(cpu*100, mem_mb/10)) say, and use that at >>> most number of threads. Or just hardcode to 300. Or a user-provided value. >>> Whatever the value is the maximum number of bundles in flight allowed at >>> any given time and needs to be communicated to the runner via some message. >>> Anything beyond would be rejected (but this shouldn't happen, because the >>> runner should respect that number). >>> >>> A dynamic computation would use the same limits from the SDK, but take >>> into account its own resource usage (incl. the usage by running bundles). >>> >>> On Fri, Aug 17, 2018 at 6:20 PM Ankur Goenka <[email protected]> wrote: >>> >>>> I am thinking upper bound to be more on the lines of theocratical upper >>>> limit or any other static high value beyond which the SDK will reject >>>> bundle verbosely. The idea is that SDK will not keep bundles in queue while >>>> waiting on current bundles to finish. It will simply reject any additional >>>> bundle. >>>> Beyond this I don't have a good answer to dynamic upper bound. As SDK >>>> does not have the complete picture of processes on the machine with which >>>> it share resources, resources might not be a good proxy for upper bound >>>> from the SDK point of view. >>>> >>>> On Fri, Aug 17, 2018 at 6:01 PM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> Ankur, how would you expect an SDK to compute a realistic upper bound >>>>> (upfront or during pipeline computation)? >>>>> >>>>> First thought that came to my mind was that the SDK would provide >>>>> CPU/memory/... resourcing information and the runner making a judgement >>>>> call as to whether it should ask the SDK to do more work or less but its >>>>> not an explicit don't do more then X bundles in parallel. >>>>> >>>>> On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <[email protected]> >>>>> wrote: >>>>> >>>>>> Makes sense. Having exposed upper bound on concurrency with optimum >>>>>> concurrency can give a good balance. This is good information to expose >>>>>> while keeping the requirements from the SDK simple. SDK can publish 1 as >>>>>> the optimum concurrency and upper bound to keep things simple. >>>>>> >>>>>> Runner introspection of upper bound on concurrency is important for >>>>>> correctness while introspection of optimum concurrency is important for >>>>>> efficiency. This separates efficiency and correctness requirements. >>>>>> >>>>>> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> I agree with Luke's observation, with the caveat that "infinite >>>>>>> amount of bundles in parallel" is limited by the available resources. >>>>>>> For >>>>>>> example, the Go SDK harness will accept an arbitrary amount of parallel >>>>>>> work, but too much work will cause either excessive GC pressure with >>>>>>> crippling slowness or an outright OOM. Unless it's always 1, a >>>>>>> reasonable >>>>>>> upper bound will either have to be provided by the user or computed from >>>>>>> the mem/cpu resources given. Of course, as some bundles takes more >>>>>>> resources than others, so any static value will be an estimate or ignore >>>>>>> resource limits. >>>>>>> >>>>>>> That said, I do not like that an "efficiency" aspect becomes a >>>>>>> subtle requirement for correctness due to Flink internals. I fear that >>>>>>> road >>>>>>> leads to trouble. >>>>>>> >>>>>>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> The later case of having a of supporting single bundle execution at >>>>>>>> a time on SDK and runner not using this flag is exactly the reason we >>>>>>>> got >>>>>>>> into the Dead Lock here. >>>>>>>> I agree with exposing SDK optimum concurrency level ( 1 in later >>>>>>>> case ) and let runner decide to use it or not. But at the same time >>>>>>>> expect >>>>>>>> SDK to handle infinite amount of bundles even if its not efficient. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Ankur >>>>>>>> >>>>>>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I believe in practice SDK harnesses will fall into one of two >>>>>>>>> capabilities, can process effectively an infinite amount of bundles in >>>>>>>>> parallel or can only process a single bundle at a time. >>>>>>>>> >>>>>>>>> I believe it is more difficult for a runner to handle the latter >>>>>>>>> case well and to perform all the environment management that would >>>>>>>>> make >>>>>>>>> that efficient. It may be inefficient for an SDK but I do believe it >>>>>>>>> should >>>>>>>>> be able to say that I'm not great at anything more then a single >>>>>>>>> bundle at >>>>>>>>> a time but utilizing this information by a runner should be optional. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> To recap the discussion it seems that we have come-up with >>>>>>>>>> following point. >>>>>>>>>> SDKHarness Management and initialization. >>>>>>>>>> >>>>>>>>>> 1. Runner completely own the work assignment to SDKHarness. >>>>>>>>>> 2. Runner should know the capabilities and capacity of >>>>>>>>>> SDKHarness and should assign work accordingly. >>>>>>>>>> 3. Spinning up of SDKHarness is runner's responsibility and >>>>>>>>>> it can be done statically (a fixed pre configured number of >>>>>>>>>> SDKHarness) or >>>>>>>>>> dynamically or based on certain other configuration/logic which >>>>>>>>>> runner >>>>>>>>>> choose. >>>>>>>>>> >>>>>>>>>> SDKHarness Expectation. This is more in question and we should >>>>>>>>>> outline the responsibility of SDKHarness. >>>>>>>>>> >>>>>>>>>> 1. SDKHarness should publish how many concurrent tasks it can >>>>>>>>>> execute. >>>>>>>>>> 2. SDKHarness should start executing all the tasks items >>>>>>>>>> assigned in parallel in a timely manner or fail task. >>>>>>>>>> >>>>>>>>>> Also to add to simplification side. I think for better adoption, >>>>>>>>>> we should have simple SDKHarness as well as simple Runner >>>>>>>>>> integration to >>>>>>>>>> encourage integration with more runner. Also many runners might not >>>>>>>>>> expose >>>>>>>>>> some of the internal scheduling characteristics so we should not >>>>>>>>>> expect >>>>>>>>>> scheduling characteristics for runner integration. Moreover >>>>>>>>>> scheduling >>>>>>>>>> characteristics can change based on pipeline type, infrastructure, >>>>>>>>>> available resource etc. So I am a bit hesitant to add runner >>>>>>>>>> scheduling >>>>>>>>>> specifics for runner integration. >>>>>>>>>> A good balance between SDKHarness complexity and Runner >>>>>>>>>> integration can be helpful in easier adoption. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Ankur >>>>>>>>>> >>>>>>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Finding a good balance is indeed the art of portability, because >>>>>>>>>>> the range of capability (and assumptions) on both sides is wide. >>>>>>>>>>> >>>>>>>>>>> It was originally the idea to allow the SDK harness to be an >>>>>>>>>>> extremely simple bundle executer (specifically, single-threaded >>>>>>>>>>> execution >>>>>>>>>>> one instruction at a time) however inefficient -- a more >>>>>>>>>>> sophisticated SDK >>>>>>>>>>> harness would support more features and be more efficient. For the >>>>>>>>>>> issue >>>>>>>>>>> described here, it seems problematic to me to send non-executable >>>>>>>>>>> bundles >>>>>>>>>>> to the SDK harness under the expectation that the SDK harness will >>>>>>>>>>> concurrently work its way deeply enough down the instruction queue >>>>>>>>>>> to >>>>>>>>>>> unblock itself. That would be an extremely subtle requirement for >>>>>>>>>>> SDK >>>>>>>>>>> authors and one practical question becomes: what should an SDK >>>>>>>>>>> do with a bundle instruction that it doesn't have capacity to >>>>>>>>>>> execute? If >>>>>>>>>>> a runner needs to make such assumptions, I think that information >>>>>>>>>>> should >>>>>>>>>>> probably rather be explicit along the lines of proposal 1 -- i.e., >>>>>>>>>>> some >>>>>>>>>>> kind of negotiation between resources allotted to the SDK harness (a >>>>>>>>>>> preliminary variant are in the provisioning api) and what the SDK >>>>>>>>>>> harness >>>>>>>>>>> in return can do (and a valid answer might be: 1 bundle at a time >>>>>>>>>>> irrespectively of resources given) or a per-bundle special >>>>>>>>>>> "overloaded" >>>>>>>>>>> error response. For other aspects, such as side input readiness, >>>>>>>>>>> the runner >>>>>>>>>>> handles that complexity and the overall bias has generally been to >>>>>>>>>>> move >>>>>>>>>>> complexity to the runner side. >>>>>>>>>>> >>>>>>>>>>> The SDK harness and initialization overhead is entirely SDK, job >>>>>>>>>>> type and even pipeline specific. A docker container is also just a >>>>>>>>>>> process, >>>>>>>>>>> btw, and doesn't inherently carry much overhead. That said, on a >>>>>>>>>>> single >>>>>>>>>>> host, a static docker configuration is generally a lot simpler to >>>>>>>>>>> work with. >>>>>>>>>>> >>>>>>>>>>> Henning >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> It is good to see this discussed! >>>>>>>>>>>> >>>>>>>>>>>> I think there needs to be a good balance between the SDK >>>>>>>>>>>> harness capabilities/complexity and responsibilities. Additionally >>>>>>>>>>>> the user >>>>>>>>>>>> will need to be able to adjust the runner behavior, since the type >>>>>>>>>>>> of >>>>>>>>>>>> workload executed in the harness also is a factor. Elsewhere we >>>>>>>>>>>> already >>>>>>>>>>>> discussed that the current assumption of a single SDK harness >>>>>>>>>>>> instance per >>>>>>>>>>>> Flink task manager brings problems with it and that there needs to >>>>>>>>>>>> be more >>>>>>>>>>>> than one way how the runner can spin up SDK harnesses. >>>>>>>>>>>> >>>>>>>>>>>> There was the concern that instantiation if multiple SDK >>>>>>>>>>>> harnesses per TM host is expensive (resource usage, initialization >>>>>>>>>>>> time >>>>>>>>>>>> etc.). That may hold true for a specific scenario, such as batch >>>>>>>>>>>> workloads >>>>>>>>>>>> and the use of Docker containers. But it may look totally >>>>>>>>>>>> different for a >>>>>>>>>>>> streaming topology or when SDK harness is just a process on the >>>>>>>>>>>> same host. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Thomas >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> SDK harnesses were always responsible for executing all work >>>>>>>>>>>>> given to it concurrently. Runners have been responsible for >>>>>>>>>>>>> choosing how >>>>>>>>>>>>> much work to give to the SDK harness in such a way that best >>>>>>>>>>>>> utilizes the >>>>>>>>>>>>> SDK harness. >>>>>>>>>>>>> >>>>>>>>>>>>> I understand that multithreading in python is inefficient due >>>>>>>>>>>>> to the global interpreter lock, it would be upto the runner in >>>>>>>>>>>>> this case to >>>>>>>>>>>>> make sure that the amount of work it gives to each SDK harness >>>>>>>>>>>>> best >>>>>>>>>>>>> utilizes it while spinning up an appropriate number of SDK >>>>>>>>>>>>> harnesses. >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Ankur, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for looking into this problem. The cause seems to be >>>>>>>>>>>>>> Flink's >>>>>>>>>>>>>> pipelined execution mode. It runs multiple tasks in one task >>>>>>>>>>>>>> slot and >>>>>>>>>>>>>> produces a deadlock when the pipelined operators schedule the >>>>>>>>>>>>>> SDK >>>>>>>>>>>>>> harness DoFns in non-topological order. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The problem would be resolved if we scheduled the tasks in >>>>>>>>>>>>>> topological >>>>>>>>>>>>>> order. Doing that is not easy because they run in separate >>>>>>>>>>>>>> Flink >>>>>>>>>>>>>> operators and the SDK Harness would have to have insight into >>>>>>>>>>>>>> the >>>>>>>>>>>>>> execution graph (which is not desirable). >>>>>>>>>>>>>> >>>>>>>>>>>>>> The easiest method, which you proposed in 1) is to ensure >>>>>>>>>>>>>> that the >>>>>>>>>>>>>> number of threads in the SDK harness matches the number of >>>>>>>>>>>>>> ExecutableStage DoFn operators. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The approach in 2) is what Flink does as well. It glues >>>>>>>>>>>>>> together >>>>>>>>>>>>>> horizontal parts of the execution graph, also in multiple >>>>>>>>>>>>>> threads. So I >>>>>>>>>>>>>> agree with your proposed solution. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Max >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote: >>>>>>>>>>>>>> > Hi, >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > tl;dr Dead Lock in task execution caused by limited task >>>>>>>>>>>>>> parallelism on >>>>>>>>>>>>>> > SDKHarness. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > *Setup:* >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > * Job type: /*Beam Portable Python Batch*/ Job on Flink >>>>>>>>>>>>>> standalone >>>>>>>>>>>>>> > cluster. >>>>>>>>>>>>>> > * Only a single job is scheduled on the cluster. >>>>>>>>>>>>>> > * Everything is running on a single machine with single >>>>>>>>>>>>>> Flink task >>>>>>>>>>>>>> > manager. >>>>>>>>>>>>>> > * Flink Task Manager Slots is 1. >>>>>>>>>>>>>> > * Flink Parallelism is 1. >>>>>>>>>>>>>> > * Python SDKHarness has 1 thread. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > *Example pipeline:* >>>>>>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > *Issue:* >>>>>>>>>>>>>> > With multi stage job, Flink schedule different dependent >>>>>>>>>>>>>> sub tasks >>>>>>>>>>>>>> > concurrently on Flink worker as long as it can get slots. >>>>>>>>>>>>>> Each map tasks >>>>>>>>>>>>>> > are then executed on SDKHarness. >>>>>>>>>>>>>> > Its possible that MapB gets to SDKHarness before MapA and >>>>>>>>>>>>>> hence gets >>>>>>>>>>>>>> > into the execution queue before MapA. Because we only have >>>>>>>>>>>>>> 1 execution >>>>>>>>>>>>>> > thread on SDKHarness, MapA will never get a chance to >>>>>>>>>>>>>> execute as MapB >>>>>>>>>>>>>> > will never release the execution thread. MapB will wait for >>>>>>>>>>>>>> input from >>>>>>>>>>>>>> > MapA. This gets us to a dead lock in a simple pipeline. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > *Mitigation:* >>>>>>>>>>>>>> > Set worker_count in pipeline options more than the expected >>>>>>>>>>>>>> sub tasks >>>>>>>>>>>>>> > in pipeline. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > *Proposal:* >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > 1. We can get the maximum concurrency from the runner and >>>>>>>>>>>>>> make sure >>>>>>>>>>>>>> > that we have more threads than max concurrency. This >>>>>>>>>>>>>> approach >>>>>>>>>>>>>> > assumes that Beam has insight into runner execution >>>>>>>>>>>>>> plan and can >>>>>>>>>>>>>> > make decision based on it. >>>>>>>>>>>>>> > 2. We dynamically create thread and cache them with a high >>>>>>>>>>>>>> upper bound >>>>>>>>>>>>>> > in SDKHarness. We can warn if we are hitting the upper >>>>>>>>>>>>>> bound of >>>>>>>>>>>>>> > threads. This approach assumes that runner does a good >>>>>>>>>>>>>> job of >>>>>>>>>>>>>> > scheduling and will distribute tasks more or less >>>>>>>>>>>>>> evenly. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > We expect good scheduling from runners so I prefer approach >>>>>>>>>>>>>> 2. It is >>>>>>>>>>>>>> > simpler to implement and the implementation is not runner >>>>>>>>>>>>>> specific. This >>>>>>>>>>>>>> > approach better utilize resource as it creates only as many >>>>>>>>>>>>>> threads as >>>>>>>>>>>>>> > needed instead of the peak thread requirement. >>>>>>>>>>>>>> > And last but not the least, it gives runner control over >>>>>>>>>>>>>> managing truly >>>>>>>>>>>>>> > active tasks. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Please let me know if I am missing something and your >>>>>>>>>>>>>> thoughts on the >>>>>>>>>>>>>> > approach. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Thanks, >>>>>>>>>>>>>> > Ankur >>>>>>>>>>>>>> > >>>>>>>>>>>>>> >>>>>>>>>>>>>
