On Fri, Sep 14, 2018 at 3:01 PM Thomas Weise <[email protected]> wrote:
> That's actually how the Flink runner already works - bundle processing > starts when elements are available (see FlinkExecutableStageFunction for > batch mode). > > But we still have the possibility of the SDK getting concurrent requests > due to parallelism (and pipelined execution). > Concurrent requests should be fine, but are there cases (in batch) where a bundle that was started cannot finish without other bundles finishing first? What pipelining would we see if the graph is of the form ExecutableStage - GBK - ExecutableStage - GBK - ... (or is it not always a digraph of this form, possibly with branching)? > > Thanks, > Thomas > > > On Fri, Sep 14, 2018 at 2:56 AM Robert Bradshaw <[email protected]> > wrote: > >> Currently the best solution we've come up with is that we must process an >> unbounded number of bundles concurrently to avoid deadlock. Especially in >> the batch case, this may be wasteful as we bring up workers for many stages >> that are not actually executable until upstream stages finish. Since it may >> be invasive to require runners to only schedule stages that can be actively >> worked on, I've been thinking about what we could do in the common runner >> libraries themselves. One idea is to postpone the actual sending of a >> process bundle request until there is data on the channel to consume. With >> Reads as Impulses, and triggers as data, all bundles are driven by some >> input. >> >> This would mean we never ask the SDK to process bundles it cannot >> immediately start working on. There is still the open question of whether >> being able to *start* a bundle implies that one is able to *finish* a >> bundle (i.e. do any runners start bundles and then block, pending other >> bundle completion, before closing the data channel (though clearly a runner >> can chop a bundle off at any point if it wants)). >> >> Does this approach sound feasible? >> >> >> On Thu, Aug 30, 2018 at 2:54 AM Ankur Goenka <[email protected]> wrote: >> >>> I managed to write a small document based on the discussion. >>> Please take a look at >>> https://docs.google.com/document/d/1oAXVPbJ0dzj2_8LXEWFAgqCP5Tpld3q5B3QU254PQ6A/edit?usp=sharing >>> >>> >>> On Tue, Aug 21, 2018 at 11:01 PM Henning Rohde <[email protected]> >>> wrote: >>> >>>> Sending bundles that cannot be executed, i.e., the situation described >>>> to cause deadlock in Flink in the beginning of the thread with mapB. The >>>> discussion of exposing (or assuming an infinitely large) concurrency level >>>> -- while a useful concept in its own right -- came around as a way to >>>> unblock mapB. >>>> >>>> On Tue, Aug 21, 2018 at 2:16 PM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> Henning, can you clarify by what you mean with send non-executable >>>>> bundles to the SDK harness and how it is useful for Flink? >>>>> >>>>> On Tue, Aug 21, 2018 at 2:01 PM Henning Rohde <[email protected]> >>>>> wrote: >>>>> >>>>>> I think it will be useful to the runner to know upfront what the >>>>>> fundamental threading capabilities are for the SDK harness (say, "fixed", >>>>>> "linear", "dynamic", ..) so that the runner can upfront make a good >>>>>> static >>>>>> decision on #harnesses and how many resources they should each have. It's >>>>>> wasteful to give the Foo SDK a whole many-core machine with TBs of >>>>>> memory, >>>>>> if it can only support a single bundle at a time. I think this is also in >>>>>> line with what Thomas and Luke are suggesting. >>>>>> >>>>>> However, it still seems to me to be a semantically problematic idea >>>>>> to send non-executable bundles to the SDK harness. I understand it's >>>>>> useful >>>>>> for Flink, but is that really the best path forward? >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Aug 20, 2018 at 5:44 PM Ankur Goenka <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> That's right. >>>>>>> To add to it. We added multi threading to python streaming as a >>>>>>> single thread is sub optimal for streaming use case. >>>>>>> Shall we move towards a conclusion on the SDK bundle processing >>>>>>> upper bound? >>>>>>> >>>>>>> On Mon, Aug 20, 2018 at 1:54 PM Lukasz Cwik <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Ankur, I can see where you are going with your argument. I believe >>>>>>>> there is certain information which is static and won't change at >>>>>>>> pipeline >>>>>>>> creation time (such as Python SDK is most efficient doing one bundle >>>>>>>> at a >>>>>>>> time) and some stuff which is best at runtime, like memory and CPU >>>>>>>> limits, >>>>>>>> worker count. >>>>>>>> >>>>>>>> On Mon, Aug 20, 2018 at 1:47 PM Ankur Goenka <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I would prefer to to keep it dynamic as it can be changed by the >>>>>>>>> infrastructure or the pipeline author. >>>>>>>>> Like in case of Python, number of concurrent bundle can be changed >>>>>>>>> by setting pipeline option worker_count. And for Java it can be >>>>>>>>> computed >>>>>>>>> based on the cpus on the machine. >>>>>>>>> >>>>>>>>> For Flink runner, we can use the worker_count parameter for now to >>>>>>>>> increase the parallelism. And we can have 1 container for each >>>>>>>>> mapPartition >>>>>>>>> task on Flink while reusing containers as container creation is >>>>>>>>> expensive >>>>>>>>> especially for Python where it installs a bunch of dependencies. >>>>>>>>> There is 1 >>>>>>>>> caveat though. I have seen machine crashes because of too many >>>>>>>>> simultaneous >>>>>>>>> container creation. We can rate limit container creation in the code >>>>>>>>> to >>>>>>>>> avoid this. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Ankur >>>>>>>>> >>>>>>>>> On Mon, Aug 20, 2018 at 9:20 AM Lukasz Cwik <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> +1 on making the resources part of a proto. Based upon what >>>>>>>>>> Henning linked to, the provisioning API seems like an appropriate >>>>>>>>>> place to >>>>>>>>>> provide this information. >>>>>>>>>> >>>>>>>>>> Thomas, I believe the environment proto is the best place to add >>>>>>>>>> information that a runner may want to know about upfront during >>>>>>>>>> pipeline >>>>>>>>>> pipeline creation. I wouldn't stick this into PipelineOptions for >>>>>>>>>> the long >>>>>>>>>> term. >>>>>>>>>> If you have time to capture these thoughts and update the >>>>>>>>>> environment proto, I would suggest going down that path. Otherwise >>>>>>>>>> anything >>>>>>>>>> short term like PipelineOptions will do. >>>>>>>>>> >>>>>>>>>> On Sun, Aug 19, 2018 at 5:41 PM Thomas Weise <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> For SDKs where the upper limit is constant and known upfront, >>>>>>>>>>> why not communicate this along with the other harness resource info >>>>>>>>>>> as part >>>>>>>>>>> of the job submission? >>>>>>>>>>> >>>>>>>>>>> Regarding use of GRPC headers: Why not make this explicit in the >>>>>>>>>>> proto instead? >>>>>>>>>>> >>>>>>>>>>> WRT runner dictating resource constraints: The runner actually >>>>>>>>>>> may also not have that information. It would need to be supplied as >>>>>>>>>>> part of >>>>>>>>>>> the pipeline options? The cluster resource manager needs to allocate >>>>>>>>>>> resources for both, the runner and the SDK harness(es). >>>>>>>>>>> >>>>>>>>>>> Finally, what can be done to unblock the Flink runner / Python >>>>>>>>>>> until solution discussed here is in place? An extra runner option >>>>>>>>>>> for SDK >>>>>>>>>>> singleton on/off? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sat, Aug 18, 2018 at 1:34 AM Ankur Goenka <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Sounds good to me. >>>>>>>>>>>> GRPC Header of the control channel seems to be a good place to >>>>>>>>>>>> add upper bound information. >>>>>>>>>>>> Added jiras: >>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-5166 >>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-5167 >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Regarding resources: the runner can currently dictate the >>>>>>>>>>>>> mem/cpu/disk resources that the harness is allowed to use via the >>>>>>>>>>>>> provisioning api. The SDK harness need not -- and should not -- >>>>>>>>>>>>> speculate >>>>>>>>>>>>> on what else might be running on the machine: >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> https://github.com/apache/beam/blob/0e14965707b5d48a3de7fa69f09d88ef0aa48c09/model/fn-execution/src/main/proto/beam_provision_api.proto#L69 >>>>>>>>>>>>> >>>>>>>>>>>>> A realistic startup-time computation in the SDK harness would >>>>>>>>>>>>> be something simple like: max(1, min(cpu*100, mem_mb/10)) say, >>>>>>>>>>>>> and use that >>>>>>>>>>>>> at most number of threads. Or just hardcode to 300. Or a >>>>>>>>>>>>> user-provided >>>>>>>>>>>>> value. Whatever the value is the maximum number of bundles in >>>>>>>>>>>>> flight >>>>>>>>>>>>> allowed at any given time and needs to be communicated to the >>>>>>>>>>>>> runner via >>>>>>>>>>>>> some message. Anything beyond would be rejected (but this >>>>>>>>>>>>> shouldn't happen, >>>>>>>>>>>>> because the runner should respect that number). >>>>>>>>>>>>> >>>>>>>>>>>>> A dynamic computation would use the same limits from the SDK, >>>>>>>>>>>>> but take into account its own resource usage (incl. the usage by >>>>>>>>>>>>> running >>>>>>>>>>>>> bundles). >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 17, 2018 at 6:20 PM Ankur Goenka < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I am thinking upper bound to be more on the lines of >>>>>>>>>>>>>> theocratical upper limit or any other static high value beyond >>>>>>>>>>>>>> which the >>>>>>>>>>>>>> SDK will reject bundle verbosely. The idea is that SDK will not >>>>>>>>>>>>>> keep >>>>>>>>>>>>>> bundles in queue while waiting on current bundles to finish. It >>>>>>>>>>>>>> will simply >>>>>>>>>>>>>> reject any additional bundle. >>>>>>>>>>>>>> Beyond this I don't have a good answer to dynamic upper >>>>>>>>>>>>>> bound. As SDK does not have the complete picture of processes on >>>>>>>>>>>>>> the >>>>>>>>>>>>>> machine with which it share resources, resources might not be a >>>>>>>>>>>>>> good proxy >>>>>>>>>>>>>> for upper bound from the SDK point of view. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 6:01 PM Lukasz Cwik <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Ankur, how would you expect an SDK to compute a realistic >>>>>>>>>>>>>>> upper bound (upfront or during pipeline computation)? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> First thought that came to my mind was that the SDK would >>>>>>>>>>>>>>> provide CPU/memory/... resourcing information and the runner >>>>>>>>>>>>>>> making a >>>>>>>>>>>>>>> judgement call as to whether it should ask the SDK to do more >>>>>>>>>>>>>>> work or less >>>>>>>>>>>>>>> but its not an explicit don't do more then X bundles in >>>>>>>>>>>>>>> parallel. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Makes sense. Having exposed upper bound on concurrency with >>>>>>>>>>>>>>>> optimum concurrency can give a good balance. This is good >>>>>>>>>>>>>>>> information to >>>>>>>>>>>>>>>> expose while keeping the requirements from the SDK simple. SDK >>>>>>>>>>>>>>>> can publish >>>>>>>>>>>>>>>> 1 as the optimum concurrency and upper bound to keep things >>>>>>>>>>>>>>>> simple. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Runner introspection of upper bound on concurrency is >>>>>>>>>>>>>>>> important for correctness while introspection of optimum >>>>>>>>>>>>>>>> concurrency is >>>>>>>>>>>>>>>> important for efficiency. This separates efficiency and >>>>>>>>>>>>>>>> correctness >>>>>>>>>>>>>>>> requirements. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I agree with Luke's observation, with the caveat that >>>>>>>>>>>>>>>>> "infinite >>>>>>>>>>>>>>>>> amount of bundles in parallel" is limited by the available >>>>>>>>>>>>>>>>> resources. For >>>>>>>>>>>>>>>>> example, the Go SDK harness will accept an arbitrary amount >>>>>>>>>>>>>>>>> of parallel >>>>>>>>>>>>>>>>> work, but too much work will cause either excessive GC >>>>>>>>>>>>>>>>> pressure with >>>>>>>>>>>>>>>>> crippling slowness or an outright OOM. Unless it's always 1, >>>>>>>>>>>>>>>>> a reasonable >>>>>>>>>>>>>>>>> upper bound will either have to be provided by the user or >>>>>>>>>>>>>>>>> computed from >>>>>>>>>>>>>>>>> the mem/cpu resources given. Of course, as some bundles takes >>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>> resources than others, so any static value will be an >>>>>>>>>>>>>>>>> estimate or ignore >>>>>>>>>>>>>>>>> resource limits. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> That said, I do not like that an "efficiency" aspect >>>>>>>>>>>>>>>>> becomes a subtle requirement for correctness due to Flink >>>>>>>>>>>>>>>>> internals. I fear >>>>>>>>>>>>>>>>> that road leads to trouble. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The later case of having a of supporting single bundle >>>>>>>>>>>>>>>>>> execution at a time on SDK and runner not using this flag is >>>>>>>>>>>>>>>>>> exactly the >>>>>>>>>>>>>>>>>> reason we got into the Dead Lock here. >>>>>>>>>>>>>>>>>> I agree with exposing SDK optimum concurrency level ( 1 >>>>>>>>>>>>>>>>>> in later case ) and let runner decide to use it or not. But >>>>>>>>>>>>>>>>>> at the same >>>>>>>>>>>>>>>>>> time expect SDK to handle infinite amount of bundles even if >>>>>>>>>>>>>>>>>> its not >>>>>>>>>>>>>>>>>> efficient. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Ankur >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I believe in practice SDK harnesses will fall into one >>>>>>>>>>>>>>>>>>> of two capabilities, can process effectively an infinite >>>>>>>>>>>>>>>>>>> amount of bundles >>>>>>>>>>>>>>>>>>> in parallel or can only process a single bundle at a time. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I believe it is more difficult for a runner to handle >>>>>>>>>>>>>>>>>>> the latter case well and to perform all the environment >>>>>>>>>>>>>>>>>>> management that >>>>>>>>>>>>>>>>>>> would make that efficient. It may be inefficient for an SDK >>>>>>>>>>>>>>>>>>> but I do >>>>>>>>>>>>>>>>>>> believe it should be able to say that I'm not great at >>>>>>>>>>>>>>>>>>> anything more then a >>>>>>>>>>>>>>>>>>> single bundle at a time but utilizing this information by a >>>>>>>>>>>>>>>>>>> runner should >>>>>>>>>>>>>>>>>>> be optional. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> To recap the discussion it seems that we have come-up >>>>>>>>>>>>>>>>>>>> with following point. >>>>>>>>>>>>>>>>>>>> SDKHarness Management and initialization. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 1. Runner completely own the work assignment to >>>>>>>>>>>>>>>>>>>> SDKHarness. >>>>>>>>>>>>>>>>>>>> 2. Runner should know the capabilities and capacity >>>>>>>>>>>>>>>>>>>> of SDKHarness and should assign work accordingly. >>>>>>>>>>>>>>>>>>>> 3. Spinning up of SDKHarness is runner's >>>>>>>>>>>>>>>>>>>> responsibility and it can be done statically (a fixed >>>>>>>>>>>>>>>>>>>> pre configured number >>>>>>>>>>>>>>>>>>>> of SDKHarness) or dynamically or based on certain other >>>>>>>>>>>>>>>>>>>> configuration/logic >>>>>>>>>>>>>>>>>>>> which runner choose. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> SDKHarness Expectation. This is more in question and we >>>>>>>>>>>>>>>>>>>> should outline the responsibility of SDKHarness. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 1. SDKHarness should publish how many concurrent >>>>>>>>>>>>>>>>>>>> tasks it can execute. >>>>>>>>>>>>>>>>>>>> 2. SDKHarness should start executing all the tasks >>>>>>>>>>>>>>>>>>>> items assigned in parallel in a timely manner or fail >>>>>>>>>>>>>>>>>>>> task. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Also to add to simplification side. I think for better >>>>>>>>>>>>>>>>>>>> adoption, we should have simple SDKHarness as well as >>>>>>>>>>>>>>>>>>>> simple Runner >>>>>>>>>>>>>>>>>>>> integration to encourage integration with more runner. >>>>>>>>>>>>>>>>>>>> Also many runners >>>>>>>>>>>>>>>>>>>> might not expose some of the internal scheduling >>>>>>>>>>>>>>>>>>>> characteristics so we >>>>>>>>>>>>>>>>>>>> should not expect scheduling characteristics for runner >>>>>>>>>>>>>>>>>>>> integration. >>>>>>>>>>>>>>>>>>>> Moreover scheduling characteristics can change based on >>>>>>>>>>>>>>>>>>>> pipeline type, >>>>>>>>>>>>>>>>>>>> infrastructure, available resource etc. So I am a bit >>>>>>>>>>>>>>>>>>>> hesitant to add >>>>>>>>>>>>>>>>>>>> runner scheduling specifics for runner integration. >>>>>>>>>>>>>>>>>>>> A good balance between SDKHarness complexity and Runner >>>>>>>>>>>>>>>>>>>> integration can be helpful in easier adoption. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>> Ankur >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde < >>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Finding a good balance is indeed the art of >>>>>>>>>>>>>>>>>>>>> portability, because the range of capability (and >>>>>>>>>>>>>>>>>>>>> assumptions) on both >>>>>>>>>>>>>>>>>>>>> sides is wide. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> It was originally the idea to allow the SDK harness to >>>>>>>>>>>>>>>>>>>>> be an extremely simple bundle executer (specifically, >>>>>>>>>>>>>>>>>>>>> single-threaded >>>>>>>>>>>>>>>>>>>>> execution one instruction at a time) however inefficient >>>>>>>>>>>>>>>>>>>>> -- a more >>>>>>>>>>>>>>>>>>>>> sophisticated SDK harness would support more features and >>>>>>>>>>>>>>>>>>>>> be more >>>>>>>>>>>>>>>>>>>>> efficient. For the issue described here, it seems >>>>>>>>>>>>>>>>>>>>> problematic to me to send >>>>>>>>>>>>>>>>>>>>> non-executable bundles to the SDK harness under the >>>>>>>>>>>>>>>>>>>>> expectation that the >>>>>>>>>>>>>>>>>>>>> SDK harness will concurrently work its way deeply enough >>>>>>>>>>>>>>>>>>>>> down the >>>>>>>>>>>>>>>>>>>>> instruction queue to unblock itself. That would be an >>>>>>>>>>>>>>>>>>>>> extremely subtle >>>>>>>>>>>>>>>>>>>>> requirement for SDK authors and one practical >>>>>>>>>>>>>>>>>>>>> question becomes: what should an SDK do with a bundle >>>>>>>>>>>>>>>>>>>>> instruction that it >>>>>>>>>>>>>>>>>>>>> doesn't have capacity to execute? If a runner needs >>>>>>>>>>>>>>>>>>>>> to make such assumptions, I think that information should >>>>>>>>>>>>>>>>>>>>> probably rather >>>>>>>>>>>>>>>>>>>>> be explicit along the lines of proposal 1 -- i.e., some >>>>>>>>>>>>>>>>>>>>> kind of negotiation >>>>>>>>>>>>>>>>>>>>> between resources allotted to the SDK harness (a >>>>>>>>>>>>>>>>>>>>> preliminary variant are in >>>>>>>>>>>>>>>>>>>>> the provisioning api) and what the SDK harness in return >>>>>>>>>>>>>>>>>>>>> can do (and a >>>>>>>>>>>>>>>>>>>>> valid answer might be: 1 bundle at a time irrespectively >>>>>>>>>>>>>>>>>>>>> of resources >>>>>>>>>>>>>>>>>>>>> given) or a per-bundle special "overloaded" error >>>>>>>>>>>>>>>>>>>>> response. For other >>>>>>>>>>>>>>>>>>>>> aspects, such as side input readiness, the runner handles >>>>>>>>>>>>>>>>>>>>> that complexity >>>>>>>>>>>>>>>>>>>>> and the overall bias has generally been to move >>>>>>>>>>>>>>>>>>>>> complexity to the runner >>>>>>>>>>>>>>>>>>>>> side. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The SDK harness and initialization overhead is >>>>>>>>>>>>>>>>>>>>> entirely SDK, job type and even pipeline specific. A >>>>>>>>>>>>>>>>>>>>> docker container is >>>>>>>>>>>>>>>>>>>>> also just a process, btw, and doesn't inherently carry >>>>>>>>>>>>>>>>>>>>> much overhead. That >>>>>>>>>>>>>>>>>>>>> said, on a single host, a static docker configuration is >>>>>>>>>>>>>>>>>>>>> generally a lot >>>>>>>>>>>>>>>>>>>>> simpler to work with. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Henning >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise < >>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> It is good to see this discussed! >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think there needs to be a good balance between the >>>>>>>>>>>>>>>>>>>>>> SDK harness capabilities/complexity and >>>>>>>>>>>>>>>>>>>>>> responsibilities. Additionally the >>>>>>>>>>>>>>>>>>>>>> user will need to be able to adjust the runner behavior, >>>>>>>>>>>>>>>>>>>>>> since the type of >>>>>>>>>>>>>>>>>>>>>> workload executed in the harness also is a factor. >>>>>>>>>>>>>>>>>>>>>> Elsewhere we already >>>>>>>>>>>>>>>>>>>>>> discussed that the current assumption of a single SDK >>>>>>>>>>>>>>>>>>>>>> harness instance per >>>>>>>>>>>>>>>>>>>>>> Flink task manager brings problems with it and that >>>>>>>>>>>>>>>>>>>>>> there needs to be more >>>>>>>>>>>>>>>>>>>>>> than one way how the runner can spin up SDK harnesses. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> There was the concern that instantiation if multiple >>>>>>>>>>>>>>>>>>>>>> SDK harnesses per TM host is expensive (resource usage, >>>>>>>>>>>>>>>>>>>>>> initialization time >>>>>>>>>>>>>>>>>>>>>> etc.). That may hold true for a specific scenario, such >>>>>>>>>>>>>>>>>>>>>> as batch workloads >>>>>>>>>>>>>>>>>>>>>> and the use of Docker containers. But it may look >>>>>>>>>>>>>>>>>>>>>> totally different for a >>>>>>>>>>>>>>>>>>>>>> streaming topology or when SDK harness is just a process >>>>>>>>>>>>>>>>>>>>>> on the same host. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik < >>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> SDK harnesses were always responsible for executing >>>>>>>>>>>>>>>>>>>>>>> all work given to it concurrently. Runners have been >>>>>>>>>>>>>>>>>>>>>>> responsible for >>>>>>>>>>>>>>>>>>>>>>> choosing how much work to give to the SDK harness in >>>>>>>>>>>>>>>>>>>>>>> such a way that best >>>>>>>>>>>>>>>>>>>>>>> utilizes the SDK harness. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I understand that multithreading in python is >>>>>>>>>>>>>>>>>>>>>>> inefficient due to the global interpreter lock, it >>>>>>>>>>>>>>>>>>>>>>> would be upto the runner >>>>>>>>>>>>>>>>>>>>>>> in this case to make sure that the amount of work it >>>>>>>>>>>>>>>>>>>>>>> gives to each SDK >>>>>>>>>>>>>>>>>>>>>>> harness best utilizes it while spinning up an >>>>>>>>>>>>>>>>>>>>>>> appropriate number of SDK >>>>>>>>>>>>>>>>>>>>>>> harnesses. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels < >>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Ankur, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for looking into this problem. The cause >>>>>>>>>>>>>>>>>>>>>>>> seems to be Flink's >>>>>>>>>>>>>>>>>>>>>>>> pipelined execution mode. It runs multiple tasks in >>>>>>>>>>>>>>>>>>>>>>>> one task slot and >>>>>>>>>>>>>>>>>>>>>>>> produces a deadlock when the pipelined operators >>>>>>>>>>>>>>>>>>>>>>>> schedule the SDK >>>>>>>>>>>>>>>>>>>>>>>> harness DoFns in non-topological order. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> The problem would be resolved if we scheduled the >>>>>>>>>>>>>>>>>>>>>>>> tasks in topological >>>>>>>>>>>>>>>>>>>>>>>> order. Doing that is not easy because they run in >>>>>>>>>>>>>>>>>>>>>>>> separate Flink >>>>>>>>>>>>>>>>>>>>>>>> operators and the SDK Harness would have to have >>>>>>>>>>>>>>>>>>>>>>>> insight into the >>>>>>>>>>>>>>>>>>>>>>>> execution graph (which is not desirable). >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> The easiest method, which you proposed in 1) is to >>>>>>>>>>>>>>>>>>>>>>>> ensure that the >>>>>>>>>>>>>>>>>>>>>>>> number of threads in the SDK harness matches the >>>>>>>>>>>>>>>>>>>>>>>> number of >>>>>>>>>>>>>>>>>>>>>>>> ExecutableStage DoFn operators. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> The approach in 2) is what Flink does as well. It >>>>>>>>>>>>>>>>>>>>>>>> glues together >>>>>>>>>>>>>>>>>>>>>>>> horizontal parts of the execution graph, also in >>>>>>>>>>>>>>>>>>>>>>>> multiple threads. So I >>>>>>>>>>>>>>>>>>>>>>>> agree with your proposed solution. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>> Max >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote: >>>>>>>>>>>>>>>>>>>>>>>> > Hi, >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > tl;dr Dead Lock in task execution caused by >>>>>>>>>>>>>>>>>>>>>>>> limited task parallelism on >>>>>>>>>>>>>>>>>>>>>>>> > SDKHarness. >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > *Setup:* >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > * Job type: /*Beam Portable Python Batch*/ Job >>>>>>>>>>>>>>>>>>>>>>>> on Flink standalone >>>>>>>>>>>>>>>>>>>>>>>> > cluster. >>>>>>>>>>>>>>>>>>>>>>>> > * Only a single job is scheduled on the cluster. >>>>>>>>>>>>>>>>>>>>>>>> > * Everything is running on a single machine >>>>>>>>>>>>>>>>>>>>>>>> with single Flink task >>>>>>>>>>>>>>>>>>>>>>>> > manager. >>>>>>>>>>>>>>>>>>>>>>>> > * Flink Task Manager Slots is 1. >>>>>>>>>>>>>>>>>>>>>>>> > * Flink Parallelism is 1. >>>>>>>>>>>>>>>>>>>>>>>> > * Python SDKHarness has 1 thread. >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > *Example pipeline:* >>>>>>>>>>>>>>>>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > *Issue:* >>>>>>>>>>>>>>>>>>>>>>>> > With multi stage job, Flink schedule different >>>>>>>>>>>>>>>>>>>>>>>> dependent sub tasks >>>>>>>>>>>>>>>>>>>>>>>> > concurrently on Flink worker as long as it can >>>>>>>>>>>>>>>>>>>>>>>> get slots. Each map tasks >>>>>>>>>>>>>>>>>>>>>>>> > are then executed on SDKHarness. >>>>>>>>>>>>>>>>>>>>>>>> > Its possible that MapB gets to SDKHarness before >>>>>>>>>>>>>>>>>>>>>>>> MapA and hence gets >>>>>>>>>>>>>>>>>>>>>>>> > into the execution queue before MapA. Because we >>>>>>>>>>>>>>>>>>>>>>>> only have 1 execution >>>>>>>>>>>>>>>>>>>>>>>> > thread on SDKHarness, MapA will never get a >>>>>>>>>>>>>>>>>>>>>>>> chance to execute as MapB >>>>>>>>>>>>>>>>>>>>>>>> > will never release the execution thread. MapB >>>>>>>>>>>>>>>>>>>>>>>> will wait for input from >>>>>>>>>>>>>>>>>>>>>>>> > MapA. This gets us to a dead lock in a simple >>>>>>>>>>>>>>>>>>>>>>>> pipeline. >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > *Mitigation:* >>>>>>>>>>>>>>>>>>>>>>>> > Set worker_count in pipeline options more than >>>>>>>>>>>>>>>>>>>>>>>> the expected sub tasks >>>>>>>>>>>>>>>>>>>>>>>> > in pipeline. >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > *Proposal:* >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > 1. We can get the maximum concurrency from the >>>>>>>>>>>>>>>>>>>>>>>> runner and make sure >>>>>>>>>>>>>>>>>>>>>>>> > that we have more threads than max >>>>>>>>>>>>>>>>>>>>>>>> concurrency. This approach >>>>>>>>>>>>>>>>>>>>>>>> > assumes that Beam has insight into runner >>>>>>>>>>>>>>>>>>>>>>>> execution plan and can >>>>>>>>>>>>>>>>>>>>>>>> > make decision based on it. >>>>>>>>>>>>>>>>>>>>>>>> > 2. We dynamically create thread and cache them >>>>>>>>>>>>>>>>>>>>>>>> with a high upper bound >>>>>>>>>>>>>>>>>>>>>>>> > in SDKHarness. We can warn if we are hitting >>>>>>>>>>>>>>>>>>>>>>>> the upper bound of >>>>>>>>>>>>>>>>>>>>>>>> > threads. This approach assumes that runner >>>>>>>>>>>>>>>>>>>>>>>> does a good job of >>>>>>>>>>>>>>>>>>>>>>>> > scheduling and will distribute tasks more or >>>>>>>>>>>>>>>>>>>>>>>> less evenly. >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > We expect good scheduling from runners so I >>>>>>>>>>>>>>>>>>>>>>>> prefer approach 2. It is >>>>>>>>>>>>>>>>>>>>>>>> > simpler to implement and the implementation is >>>>>>>>>>>>>>>>>>>>>>>> not runner specific. This >>>>>>>>>>>>>>>>>>>>>>>> > approach better utilize resource as it creates >>>>>>>>>>>>>>>>>>>>>>>> only as many threads as >>>>>>>>>>>>>>>>>>>>>>>> > needed instead of the peak thread requirement. >>>>>>>>>>>>>>>>>>>>>>>> > And last but not the least, it gives runner >>>>>>>>>>>>>>>>>>>>>>>> control over managing truly >>>>>>>>>>>>>>>>>>>>>>>> > active tasks. >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > Please let me know if I am missing something and >>>>>>>>>>>>>>>>>>>>>>>> your thoughts on the >>>>>>>>>>>>>>>>>>>>>>>> > approach. >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > Thanks, >>>>>>>>>>>>>>>>>>>>>>>> > Ankur >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>
