Ankur, I can see where you are going with your argument. I believe there is certain information which is static and won't change at pipeline creation time (such as Python SDK is most efficient doing one bundle at a time) and some stuff which is best at runtime, like memory and CPU limits, worker count.
On Mon, Aug 20, 2018 at 1:47 PM Ankur Goenka <[email protected]> wrote: > I would prefer to to keep it dynamic as it can be changed by the > infrastructure or the pipeline author. > Like in case of Python, number of concurrent bundle can be changed by > setting pipeline option worker_count. And for Java it can be computed based > on the cpus on the machine. > > For Flink runner, we can use the worker_count parameter for now to > increase the parallelism. And we can have 1 container for each mapPartition > task on Flink while reusing containers as container creation is expensive > especially for Python where it installs a bunch of dependencies. There is 1 > caveat though. I have seen machine crashes because of too many simultaneous > container creation. We can rate limit container creation in the code to > avoid this. > > Thanks, > Ankur > > On Mon, Aug 20, 2018 at 9:20 AM Lukasz Cwik <[email protected]> wrote: > >> +1 on making the resources part of a proto. Based upon what Henning >> linked to, the provisioning API seems like an appropriate place to provide >> this information. >> >> Thomas, I believe the environment proto is the best place to add >> information that a runner may want to know about upfront during pipeline >> pipeline creation. I wouldn't stick this into PipelineOptions for the long >> term. >> If you have time to capture these thoughts and update the environment >> proto, I would suggest going down that path. Otherwise anything short term >> like PipelineOptions will do. >> >> On Sun, Aug 19, 2018 at 5:41 PM Thomas Weise <[email protected]> wrote: >> >>> For SDKs where the upper limit is constant and known upfront, why not >>> communicate this along with the other harness resource info as part of the >>> job submission? >>> >>> Regarding use of GRPC headers: Why not make this explicit in the proto >>> instead? >>> >>> WRT runner dictating resource constraints: The runner actually may also >>> not have that information. It would need to be supplied as part of the >>> pipeline options? The cluster resource manager needs to allocate resources >>> for both, the runner and the SDK harness(es). >>> >>> Finally, what can be done to unblock the Flink runner / Python until >>> solution discussed here is in place? An extra runner option for SDK >>> singleton on/off? >>> >>> >>> On Sat, Aug 18, 2018 at 1:34 AM Ankur Goenka <[email protected]> wrote: >>> >>>> Sounds good to me. >>>> GRPC Header of the control channel seems to be a good place to add >>>> upper bound information. >>>> Added jiras: >>>> https://issues.apache.org/jira/browse/BEAM-5166 >>>> https://issues.apache.org/jira/browse/BEAM-5167 >>>> >>>> On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde <[email protected]> >>>> wrote: >>>> >>>>> Regarding resources: the runner can currently dictate the mem/cpu/disk >>>>> resources that the harness is allowed to use via the provisioning api. The >>>>> SDK harness need not -- and should not -- speculate on what else might be >>>>> running on the machine: >>>>> >>>>> >>>>> https://github.com/apache/beam/blob/0e14965707b5d48a3de7fa69f09d88ef0aa48c09/model/fn-execution/src/main/proto/beam_provision_api.proto#L69 >>>>> >>>>> A realistic startup-time computation in the SDK harness would be >>>>> something simple like: max(1, min(cpu*100, mem_mb/10)) say, and use that >>>>> at >>>>> most number of threads. Or just hardcode to 300. Or a user-provided value. >>>>> Whatever the value is the maximum number of bundles in flight allowed at >>>>> any given time and needs to be communicated to the runner via some >>>>> message. >>>>> Anything beyond would be rejected (but this shouldn't happen, because the >>>>> runner should respect that number). >>>>> >>>>> A dynamic computation would use the same limits from the SDK, but take >>>>> into account its own resource usage (incl. the usage by running bundles). >>>>> >>>>> On Fri, Aug 17, 2018 at 6:20 PM Ankur Goenka <[email protected]> >>>>> wrote: >>>>> >>>>>> I am thinking upper bound to be more on the lines of theocratical >>>>>> upper limit or any other static high value beyond which the SDK will >>>>>> reject >>>>>> bundle verbosely. The idea is that SDK will not keep bundles in queue >>>>>> while >>>>>> waiting on current bundles to finish. It will simply reject any >>>>>> additional >>>>>> bundle. >>>>>> Beyond this I don't have a good answer to dynamic upper bound. As SDK >>>>>> does not have the complete picture of processes on the machine with which >>>>>> it share resources, resources might not be a good proxy for upper bound >>>>>> from the SDK point of view. >>>>>> >>>>>> On Fri, Aug 17, 2018 at 6:01 PM Lukasz Cwik <[email protected]> wrote: >>>>>> >>>>>>> Ankur, how would you expect an SDK to compute a realistic upper >>>>>>> bound (upfront or during pipeline computation)? >>>>>>> >>>>>>> First thought that came to my mind was that the SDK would provide >>>>>>> CPU/memory/... resourcing information and the runner making a judgement >>>>>>> call as to whether it should ask the SDK to do more work or less but its >>>>>>> not an explicit don't do more then X bundles in parallel. >>>>>>> >>>>>>> On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Makes sense. Having exposed upper bound on concurrency with optimum >>>>>>>> concurrency can give a good balance. This is good information to expose >>>>>>>> while keeping the requirements from the SDK simple. SDK can publish 1 >>>>>>>> as >>>>>>>> the optimum concurrency and upper bound to keep things simple. >>>>>>>> >>>>>>>> Runner introspection of upper bound on concurrency is important for >>>>>>>> correctness while introspection of optimum concurrency is important for >>>>>>>> efficiency. This separates efficiency and correctness requirements. >>>>>>>> >>>>>>>> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I agree with Luke's observation, with the caveat that "infinite >>>>>>>>> amount of bundles in parallel" is limited by the available resources. >>>>>>>>> For >>>>>>>>> example, the Go SDK harness will accept an arbitrary amount of >>>>>>>>> parallel >>>>>>>>> work, but too much work will cause either excessive GC pressure with >>>>>>>>> crippling slowness or an outright OOM. Unless it's always 1, a >>>>>>>>> reasonable >>>>>>>>> upper bound will either have to be provided by the user or computed >>>>>>>>> from >>>>>>>>> the mem/cpu resources given. Of course, as some bundles takes more >>>>>>>>> resources than others, so any static value will be an estimate or >>>>>>>>> ignore >>>>>>>>> resource limits. >>>>>>>>> >>>>>>>>> That said, I do not like that an "efficiency" aspect becomes a >>>>>>>>> subtle requirement for correctness due to Flink internals. I fear >>>>>>>>> that road >>>>>>>>> leads to trouble. >>>>>>>>> >>>>>>>>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> The later case of having a of supporting single bundle execution >>>>>>>>>> at a time on SDK and runner not using this flag is exactly the >>>>>>>>>> reason we >>>>>>>>>> got into the Dead Lock here. >>>>>>>>>> I agree with exposing SDK optimum concurrency level ( 1 in later >>>>>>>>>> case ) and let runner decide to use it or not. But at the same time >>>>>>>>>> expect >>>>>>>>>> SDK to handle infinite amount of bundles even if its not efficient. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Ankur >>>>>>>>>> >>>>>>>>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I believe in practice SDK harnesses will fall into one of two >>>>>>>>>>> capabilities, can process effectively an infinite amount of bundles >>>>>>>>>>> in >>>>>>>>>>> parallel or can only process a single bundle at a time. >>>>>>>>>>> >>>>>>>>>>> I believe it is more difficult for a runner to handle the latter >>>>>>>>>>> case well and to perform all the environment management that would >>>>>>>>>>> make >>>>>>>>>>> that efficient. It may be inefficient for an SDK but I do believe >>>>>>>>>>> it should >>>>>>>>>>> be able to say that I'm not great at anything more then a single >>>>>>>>>>> bundle at >>>>>>>>>>> a time but utilizing this information by a runner should be >>>>>>>>>>> optional. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> To recap the discussion it seems that we have come-up with >>>>>>>>>>>> following point. >>>>>>>>>>>> SDKHarness Management and initialization. >>>>>>>>>>>> >>>>>>>>>>>> 1. Runner completely own the work assignment to SDKHarness. >>>>>>>>>>>> 2. Runner should know the capabilities and capacity of >>>>>>>>>>>> SDKHarness and should assign work accordingly. >>>>>>>>>>>> 3. Spinning up of SDKHarness is runner's responsibility and >>>>>>>>>>>> it can be done statically (a fixed pre configured number of >>>>>>>>>>>> SDKHarness) or >>>>>>>>>>>> dynamically or based on certain other configuration/logic which >>>>>>>>>>>> runner >>>>>>>>>>>> choose. >>>>>>>>>>>> >>>>>>>>>>>> SDKHarness Expectation. This is more in question and we should >>>>>>>>>>>> outline the responsibility of SDKHarness. >>>>>>>>>>>> >>>>>>>>>>>> 1. SDKHarness should publish how many concurrent tasks it >>>>>>>>>>>> can execute. >>>>>>>>>>>> 2. SDKHarness should start executing all the tasks items >>>>>>>>>>>> assigned in parallel in a timely manner or fail task. >>>>>>>>>>>> >>>>>>>>>>>> Also to add to simplification side. I think for better >>>>>>>>>>>> adoption, we should have simple SDKHarness as well as simple Runner >>>>>>>>>>>> integration to encourage integration with more runner. Also many >>>>>>>>>>>> runners >>>>>>>>>>>> might not expose some of the internal scheduling characteristics >>>>>>>>>>>> so we >>>>>>>>>>>> should not expect scheduling characteristics for runner >>>>>>>>>>>> integration. >>>>>>>>>>>> Moreover scheduling characteristics can change based on pipeline >>>>>>>>>>>> type, >>>>>>>>>>>> infrastructure, available resource etc. So I am a bit hesitant to >>>>>>>>>>>> add >>>>>>>>>>>> runner scheduling specifics for runner integration. >>>>>>>>>>>> A good balance between SDKHarness complexity and Runner >>>>>>>>>>>> integration can be helpful in easier adoption. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Ankur >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Finding a good balance is indeed the art of portability, >>>>>>>>>>>>> because the range of capability (and assumptions) on both sides >>>>>>>>>>>>> is wide. >>>>>>>>>>>>> >>>>>>>>>>>>> It was originally the idea to allow the SDK harness to be an >>>>>>>>>>>>> extremely simple bundle executer (specifically, single-threaded >>>>>>>>>>>>> execution >>>>>>>>>>>>> one instruction at a time) however inefficient -- a more >>>>>>>>>>>>> sophisticated SDK >>>>>>>>>>>>> harness would support more features and be more efficient. For >>>>>>>>>>>>> the issue >>>>>>>>>>>>> described here, it seems problematic to me to send non-executable >>>>>>>>>>>>> bundles >>>>>>>>>>>>> to the SDK harness under the expectation that the SDK harness will >>>>>>>>>>>>> concurrently work its way deeply enough down the instruction >>>>>>>>>>>>> queue to >>>>>>>>>>>>> unblock itself. That would be an extremely subtle requirement for >>>>>>>>>>>>> SDK >>>>>>>>>>>>> authors and one practical question becomes: what should an >>>>>>>>>>>>> SDK do with a bundle instruction that it doesn't have capacity to >>>>>>>>>>>>> execute? If >>>>>>>>>>>>> a runner needs to make such assumptions, I think that information >>>>>>>>>>>>> should >>>>>>>>>>>>> probably rather be explicit along the lines of proposal 1 -- >>>>>>>>>>>>> i.e., some >>>>>>>>>>>>> kind of negotiation between resources allotted to the SDK harness >>>>>>>>>>>>> (a >>>>>>>>>>>>> preliminary variant are in the provisioning api) and what the SDK >>>>>>>>>>>>> harness >>>>>>>>>>>>> in return can do (and a valid answer might be: 1 bundle at a time >>>>>>>>>>>>> irrespectively of resources given) or a per-bundle special >>>>>>>>>>>>> "overloaded" >>>>>>>>>>>>> error response. For other aspects, such as side input readiness, >>>>>>>>>>>>> the runner >>>>>>>>>>>>> handles that complexity and the overall bias has generally been >>>>>>>>>>>>> to move >>>>>>>>>>>>> complexity to the runner side. >>>>>>>>>>>>> >>>>>>>>>>>>> The SDK harness and initialization overhead is entirely SDK, >>>>>>>>>>>>> job type and even pipeline specific. A docker container is also >>>>>>>>>>>>> just a >>>>>>>>>>>>> process, btw, and doesn't inherently carry much overhead. That >>>>>>>>>>>>> said, on a >>>>>>>>>>>>> single host, a static docker configuration is generally a lot >>>>>>>>>>>>> simpler to >>>>>>>>>>>>> work with. >>>>>>>>>>>>> >>>>>>>>>>>>> Henning >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <[email protected]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> It is good to see this discussed! >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think there needs to be a good balance between the SDK >>>>>>>>>>>>>> harness capabilities/complexity and responsibilities. >>>>>>>>>>>>>> Additionally the user >>>>>>>>>>>>>> will need to be able to adjust the runner behavior, since the >>>>>>>>>>>>>> type of >>>>>>>>>>>>>> workload executed in the harness also is a factor. Elsewhere we >>>>>>>>>>>>>> already >>>>>>>>>>>>>> discussed that the current assumption of a single SDK harness >>>>>>>>>>>>>> instance per >>>>>>>>>>>>>> Flink task manager brings problems with it and that there needs >>>>>>>>>>>>>> to be more >>>>>>>>>>>>>> than one way how the runner can spin up SDK harnesses. >>>>>>>>>>>>>> >>>>>>>>>>>>>> There was the concern that instantiation if multiple SDK >>>>>>>>>>>>>> harnesses per TM host is expensive (resource usage, >>>>>>>>>>>>>> initialization time >>>>>>>>>>>>>> etc.). That may hold true for a specific scenario, such as batch >>>>>>>>>>>>>> workloads >>>>>>>>>>>>>> and the use of Docker containers. But it may look totally >>>>>>>>>>>>>> different for a >>>>>>>>>>>>>> streaming topology or when SDK harness is just a process on the >>>>>>>>>>>>>> same host. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> SDK harnesses were always responsible for executing all work >>>>>>>>>>>>>>> given to it concurrently. Runners have been responsible for >>>>>>>>>>>>>>> choosing how >>>>>>>>>>>>>>> much work to give to the SDK harness in such a way that best >>>>>>>>>>>>>>> utilizes the >>>>>>>>>>>>>>> SDK harness. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I understand that multithreading in python is inefficient >>>>>>>>>>>>>>> due to the global interpreter lock, it would be upto the runner >>>>>>>>>>>>>>> in this >>>>>>>>>>>>>>> case to make sure that the amount of work it gives to each SDK >>>>>>>>>>>>>>> harness best >>>>>>>>>>>>>>> utilizes it while spinning up an appropriate number of SDK >>>>>>>>>>>>>>> harnesses. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Ankur, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for looking into this problem. The cause seems to be >>>>>>>>>>>>>>>> Flink's >>>>>>>>>>>>>>>> pipelined execution mode. It runs multiple tasks in one >>>>>>>>>>>>>>>> task slot and >>>>>>>>>>>>>>>> produces a deadlock when the pipelined operators schedule >>>>>>>>>>>>>>>> the SDK >>>>>>>>>>>>>>>> harness DoFns in non-topological order. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The problem would be resolved if we scheduled the tasks in >>>>>>>>>>>>>>>> topological >>>>>>>>>>>>>>>> order. Doing that is not easy because they run in separate >>>>>>>>>>>>>>>> Flink >>>>>>>>>>>>>>>> operators and the SDK Harness would have to have insight >>>>>>>>>>>>>>>> into the >>>>>>>>>>>>>>>> execution graph (which is not desirable). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The easiest method, which you proposed in 1) is to ensure >>>>>>>>>>>>>>>> that the >>>>>>>>>>>>>>>> number of threads in the SDK harness matches the number of >>>>>>>>>>>>>>>> ExecutableStage DoFn operators. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The approach in 2) is what Flink does as well. It glues >>>>>>>>>>>>>>>> together >>>>>>>>>>>>>>>> horizontal parts of the execution graph, also in multiple >>>>>>>>>>>>>>>> threads. So I >>>>>>>>>>>>>>>> agree with your proposed solution. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>> Max >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote: >>>>>>>>>>>>>>>> > Hi, >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > tl;dr Dead Lock in task execution caused by limited task >>>>>>>>>>>>>>>> parallelism on >>>>>>>>>>>>>>>> > SDKHarness. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > *Setup:* >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > * Job type: /*Beam Portable Python Batch*/ Job on Flink >>>>>>>>>>>>>>>> standalone >>>>>>>>>>>>>>>> > cluster. >>>>>>>>>>>>>>>> > * Only a single job is scheduled on the cluster. >>>>>>>>>>>>>>>> > * Everything is running on a single machine with single >>>>>>>>>>>>>>>> Flink task >>>>>>>>>>>>>>>> > manager. >>>>>>>>>>>>>>>> > * Flink Task Manager Slots is 1. >>>>>>>>>>>>>>>> > * Flink Parallelism is 1. >>>>>>>>>>>>>>>> > * Python SDKHarness has 1 thread. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > *Example pipeline:* >>>>>>>>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > *Issue:* >>>>>>>>>>>>>>>> > With multi stage job, Flink schedule different dependent >>>>>>>>>>>>>>>> sub tasks >>>>>>>>>>>>>>>> > concurrently on Flink worker as long as it can get slots. >>>>>>>>>>>>>>>> Each map tasks >>>>>>>>>>>>>>>> > are then executed on SDKHarness. >>>>>>>>>>>>>>>> > Its possible that MapB gets to SDKHarness before MapA and >>>>>>>>>>>>>>>> hence gets >>>>>>>>>>>>>>>> > into the execution queue before MapA. Because we only >>>>>>>>>>>>>>>> have 1 execution >>>>>>>>>>>>>>>> > thread on SDKHarness, MapA will never get a chance to >>>>>>>>>>>>>>>> execute as MapB >>>>>>>>>>>>>>>> > will never release the execution thread. MapB will wait >>>>>>>>>>>>>>>> for input from >>>>>>>>>>>>>>>> > MapA. This gets us to a dead lock in a simple pipeline. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > *Mitigation:* >>>>>>>>>>>>>>>> > Set worker_count in pipeline options more than the >>>>>>>>>>>>>>>> expected sub tasks >>>>>>>>>>>>>>>> > in pipeline. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > *Proposal:* >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > 1. We can get the maximum concurrency from the runner >>>>>>>>>>>>>>>> and make sure >>>>>>>>>>>>>>>> > that we have more threads than max concurrency. This >>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>> > assumes that Beam has insight into runner execution >>>>>>>>>>>>>>>> plan and can >>>>>>>>>>>>>>>> > make decision based on it. >>>>>>>>>>>>>>>> > 2. We dynamically create thread and cache them with a >>>>>>>>>>>>>>>> high upper bound >>>>>>>>>>>>>>>> > in SDKHarness. We can warn if we are hitting the >>>>>>>>>>>>>>>> upper bound of >>>>>>>>>>>>>>>> > threads. This approach assumes that runner does a >>>>>>>>>>>>>>>> good job of >>>>>>>>>>>>>>>> > scheduling and will distribute tasks more or less >>>>>>>>>>>>>>>> evenly. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > We expect good scheduling from runners so I prefer >>>>>>>>>>>>>>>> approach 2. It is >>>>>>>>>>>>>>>> > simpler to implement and the implementation is not runner >>>>>>>>>>>>>>>> specific. This >>>>>>>>>>>>>>>> > approach better utilize resource as it creates only as >>>>>>>>>>>>>>>> many threads as >>>>>>>>>>>>>>>> > needed instead of the peak thread requirement. >>>>>>>>>>>>>>>> > And last but not the least, it gives runner control over >>>>>>>>>>>>>>>> managing truly >>>>>>>>>>>>>>>> > active tasks. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > Please let me know if I am missing something and your >>>>>>>>>>>>>>>> thoughts on the >>>>>>>>>>>>>>>> > approach. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > Thanks, >>>>>>>>>>>>>>>> > Ankur >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>
