I managed to write a small document based on the discussion. Please take a look at https://docs.google.com/document/d/1oAXVPbJ0dzj2_8LXEWFAgqCP5Tpld3q5B3QU254PQ6A/edit?usp=sharing
On Tue, Aug 21, 2018 at 11:01 PM Henning Rohde <[email protected]> wrote: > Sending bundles that cannot be executed, i.e., the situation described to > cause deadlock in Flink in the beginning of the thread with mapB. The > discussion of exposing (or assuming an infinitely large) concurrency level > -- while a useful concept in its own right -- came around as a way to > unblock mapB. > > On Tue, Aug 21, 2018 at 2:16 PM Lukasz Cwik <[email protected]> wrote: > >> Henning, can you clarify by what you mean with send non-executable >> bundles to the SDK harness and how it is useful for Flink? >> >> On Tue, Aug 21, 2018 at 2:01 PM Henning Rohde <[email protected]> wrote: >> >>> I think it will be useful to the runner to know upfront what the >>> fundamental threading capabilities are for the SDK harness (say, "fixed", >>> "linear", "dynamic", ..) so that the runner can upfront make a good static >>> decision on #harnesses and how many resources they should each have. It's >>> wasteful to give the Foo SDK a whole many-core machine with TBs of memory, >>> if it can only support a single bundle at a time. I think this is also in >>> line with what Thomas and Luke are suggesting. >>> >>> However, it still seems to me to be a semantically problematic idea to >>> send non-executable bundles to the SDK harness. I understand it's useful >>> for Flink, but is that really the best path forward? >>> >>> >>> >>> On Mon, Aug 20, 2018 at 5:44 PM Ankur Goenka <[email protected]> wrote: >>> >>>> That's right. >>>> To add to it. We added multi threading to python streaming as a single >>>> thread is sub optimal for streaming use case. >>>> Shall we move towards a conclusion on the SDK bundle processing upper >>>> bound? >>>> >>>> On Mon, Aug 20, 2018 at 1:54 PM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> Ankur, I can see where you are going with your argument. I believe >>>>> there is certain information which is static and won't change at pipeline >>>>> creation time (such as Python SDK is most efficient doing one bundle at a >>>>> time) and some stuff which is best at runtime, like memory and CPU limits, >>>>> worker count. >>>>> >>>>> On Mon, Aug 20, 2018 at 1:47 PM Ankur Goenka <[email protected]> >>>>> wrote: >>>>> >>>>>> I would prefer to to keep it dynamic as it can be changed by the >>>>>> infrastructure or the pipeline author. >>>>>> Like in case of Python, number of concurrent bundle can be changed by >>>>>> setting pipeline option worker_count. And for Java it can be computed >>>>>> based >>>>>> on the cpus on the machine. >>>>>> >>>>>> For Flink runner, we can use the worker_count parameter for now to >>>>>> increase the parallelism. And we can have 1 container for each >>>>>> mapPartition >>>>>> task on Flink while reusing containers as container creation is expensive >>>>>> especially for Python where it installs a bunch of dependencies. There >>>>>> is 1 >>>>>> caveat though. I have seen machine crashes because of too many >>>>>> simultaneous >>>>>> container creation. We can rate limit container creation in the code to >>>>>> avoid this. >>>>>> >>>>>> Thanks, >>>>>> Ankur >>>>>> >>>>>> On Mon, Aug 20, 2018 at 9:20 AM Lukasz Cwik <[email protected]> wrote: >>>>>> >>>>>>> +1 on making the resources part of a proto. Based upon what Henning >>>>>>> linked to, the provisioning API seems like an appropriate place to >>>>>>> provide >>>>>>> this information. >>>>>>> >>>>>>> Thomas, I believe the environment proto is the best place to add >>>>>>> information that a runner may want to know about upfront during pipeline >>>>>>> pipeline creation. I wouldn't stick this into PipelineOptions for the >>>>>>> long >>>>>>> term. >>>>>>> If you have time to capture these thoughts and update the >>>>>>> environment proto, I would suggest going down that path. Otherwise >>>>>>> anything >>>>>>> short term like PipelineOptions will do. >>>>>>> >>>>>>> On Sun, Aug 19, 2018 at 5:41 PM Thomas Weise <[email protected]> wrote: >>>>>>> >>>>>>>> For SDKs where the upper limit is constant and known upfront, why >>>>>>>> not communicate this along with the other harness resource info as >>>>>>>> part of >>>>>>>> the job submission? >>>>>>>> >>>>>>>> Regarding use of GRPC headers: Why not make this explicit in the >>>>>>>> proto instead? >>>>>>>> >>>>>>>> WRT runner dictating resource constraints: The runner actually may >>>>>>>> also not have that information. It would need to be supplied as part >>>>>>>> of the >>>>>>>> pipeline options? The cluster resource manager needs to allocate >>>>>>>> resources >>>>>>>> for both, the runner and the SDK harness(es). >>>>>>>> >>>>>>>> Finally, what can be done to unblock the Flink runner / Python >>>>>>>> until solution discussed here is in place? An extra runner option for >>>>>>>> SDK >>>>>>>> singleton on/off? >>>>>>>> >>>>>>>> >>>>>>>> On Sat, Aug 18, 2018 at 1:34 AM Ankur Goenka <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Sounds good to me. >>>>>>>>> GRPC Header of the control channel seems to be a good place to add >>>>>>>>> upper bound information. >>>>>>>>> Added jiras: >>>>>>>>> https://issues.apache.org/jira/browse/BEAM-5166 >>>>>>>>> https://issues.apache.org/jira/browse/BEAM-5167 >>>>>>>>> >>>>>>>>> On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Regarding resources: the runner can currently dictate the >>>>>>>>>> mem/cpu/disk resources that the harness is allowed to use via the >>>>>>>>>> provisioning api. The SDK harness need not -- and should not -- >>>>>>>>>> speculate >>>>>>>>>> on what else might be running on the machine: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> https://github.com/apache/beam/blob/0e14965707b5d48a3de7fa69f09d88ef0aa48c09/model/fn-execution/src/main/proto/beam_provision_api.proto#L69 >>>>>>>>>> >>>>>>>>>> A realistic startup-time computation in the SDK harness would be >>>>>>>>>> something simple like: max(1, min(cpu*100, mem_mb/10)) say, and use >>>>>>>>>> that at >>>>>>>>>> most number of threads. Or just hardcode to 300. Or a user-provided >>>>>>>>>> value. >>>>>>>>>> Whatever the value is the maximum number of bundles in flight >>>>>>>>>> allowed at >>>>>>>>>> any given time and needs to be communicated to the runner via some >>>>>>>>>> message. >>>>>>>>>> Anything beyond would be rejected (but this shouldn't happen, >>>>>>>>>> because the >>>>>>>>>> runner should respect that number). >>>>>>>>>> >>>>>>>>>> A dynamic computation would use the same limits from the SDK, but >>>>>>>>>> take into account its own resource usage (incl. the usage by running >>>>>>>>>> bundles). >>>>>>>>>> >>>>>>>>>> On Fri, Aug 17, 2018 at 6:20 PM Ankur Goenka <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I am thinking upper bound to be more on the lines of >>>>>>>>>>> theocratical upper limit or any other static high value beyond >>>>>>>>>>> which the >>>>>>>>>>> SDK will reject bundle verbosely. The idea is that SDK will not keep >>>>>>>>>>> bundles in queue while waiting on current bundles to finish. It >>>>>>>>>>> will simply >>>>>>>>>>> reject any additional bundle. >>>>>>>>>>> Beyond this I don't have a good answer to dynamic upper bound. >>>>>>>>>>> As SDK does not have the complete picture of processes on the >>>>>>>>>>> machine with >>>>>>>>>>> which it share resources, resources might not be a good proxy for >>>>>>>>>>> upper >>>>>>>>>>> bound from the SDK point of view. >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 17, 2018 at 6:01 PM Lukasz Cwik <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Ankur, how would you expect an SDK to compute a realistic upper >>>>>>>>>>>> bound (upfront or during pipeline computation)? >>>>>>>>>>>> >>>>>>>>>>>> First thought that came to my mind was that the SDK would >>>>>>>>>>>> provide CPU/memory/... resourcing information and the runner >>>>>>>>>>>> making a >>>>>>>>>>>> judgement call as to whether it should ask the SDK to do more work >>>>>>>>>>>> or less >>>>>>>>>>>> but its not an explicit don't do more then X bundles in parallel. >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Makes sense. Having exposed upper bound on concurrency with >>>>>>>>>>>>> optimum concurrency can give a good balance. This is good >>>>>>>>>>>>> information to >>>>>>>>>>>>> expose while keeping the requirements from the SDK simple. SDK >>>>>>>>>>>>> can publish >>>>>>>>>>>>> 1 as the optimum concurrency and upper bound to keep things >>>>>>>>>>>>> simple. >>>>>>>>>>>>> >>>>>>>>>>>>> Runner introspection of upper bound on concurrency is >>>>>>>>>>>>> important for correctness while introspection of optimum >>>>>>>>>>>>> concurrency is >>>>>>>>>>>>> important for efficiency. This separates efficiency and >>>>>>>>>>>>> correctness >>>>>>>>>>>>> requirements. >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I agree with Luke's observation, with the caveat that "infinite >>>>>>>>>>>>>> amount of bundles in parallel" is limited by the available >>>>>>>>>>>>>> resources. For >>>>>>>>>>>>>> example, the Go SDK harness will accept an arbitrary amount of >>>>>>>>>>>>>> parallel >>>>>>>>>>>>>> work, but too much work will cause either excessive GC pressure >>>>>>>>>>>>>> with >>>>>>>>>>>>>> crippling slowness or an outright OOM. Unless it's always 1, a >>>>>>>>>>>>>> reasonable >>>>>>>>>>>>>> upper bound will either have to be provided by the user or >>>>>>>>>>>>>> computed from >>>>>>>>>>>>>> the mem/cpu resources given. Of course, as some bundles takes >>>>>>>>>>>>>> more >>>>>>>>>>>>>> resources than others, so any static value will be an estimate >>>>>>>>>>>>>> or ignore >>>>>>>>>>>>>> resource limits. >>>>>>>>>>>>>> >>>>>>>>>>>>>> That said, I do not like that an "efficiency" aspect becomes >>>>>>>>>>>>>> a subtle requirement for correctness due to Flink internals. I >>>>>>>>>>>>>> fear that >>>>>>>>>>>>>> road leads to trouble. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> The later case of having a of supporting single bundle >>>>>>>>>>>>>>> execution at a time on SDK and runner not using this flag is >>>>>>>>>>>>>>> exactly the >>>>>>>>>>>>>>> reason we got into the Dead Lock here. >>>>>>>>>>>>>>> I agree with exposing SDK optimum concurrency level ( 1 in >>>>>>>>>>>>>>> later case ) and let runner decide to use it or not. But at the >>>>>>>>>>>>>>> same time >>>>>>>>>>>>>>> expect SDK to handle infinite amount of bundles even if its not >>>>>>>>>>>>>>> efficient. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Ankur >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I believe in practice SDK harnesses will fall into one of >>>>>>>>>>>>>>>> two capabilities, can process effectively an infinite amount >>>>>>>>>>>>>>>> of bundles in >>>>>>>>>>>>>>>> parallel or can only process a single bundle at a time. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I believe it is more difficult for a runner to handle the >>>>>>>>>>>>>>>> latter case well and to perform all the environment management >>>>>>>>>>>>>>>> that would >>>>>>>>>>>>>>>> make that efficient. It may be inefficient for an SDK but I do >>>>>>>>>>>>>>>> believe it >>>>>>>>>>>>>>>> should be able to say that I'm not great at anything more then >>>>>>>>>>>>>>>> a single >>>>>>>>>>>>>>>> bundle at a time but utilizing this information by a runner >>>>>>>>>>>>>>>> should be >>>>>>>>>>>>>>>> optional. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> To recap the discussion it seems that we have come-up with >>>>>>>>>>>>>>>>> following point. >>>>>>>>>>>>>>>>> SDKHarness Management and initialization. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. Runner completely own the work assignment to >>>>>>>>>>>>>>>>> SDKHarness. >>>>>>>>>>>>>>>>> 2. Runner should know the capabilities and capacity of >>>>>>>>>>>>>>>>> SDKHarness and should assign work accordingly. >>>>>>>>>>>>>>>>> 3. Spinning up of SDKHarness is runner's >>>>>>>>>>>>>>>>> responsibility and it can be done statically (a fixed pre >>>>>>>>>>>>>>>>> configured number >>>>>>>>>>>>>>>>> of SDKHarness) or dynamically or based on certain other >>>>>>>>>>>>>>>>> configuration/logic >>>>>>>>>>>>>>>>> which runner choose. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> SDKHarness Expectation. This is more in question and we >>>>>>>>>>>>>>>>> should outline the responsibility of SDKHarness. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. SDKHarness should publish how many concurrent tasks >>>>>>>>>>>>>>>>> it can execute. >>>>>>>>>>>>>>>>> 2. SDKHarness should start executing all the tasks >>>>>>>>>>>>>>>>> items assigned in parallel in a timely manner or fail task. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Also to add to simplification side. I think for better >>>>>>>>>>>>>>>>> adoption, we should have simple SDKHarness as well as simple >>>>>>>>>>>>>>>>> Runner >>>>>>>>>>>>>>>>> integration to encourage integration with more runner. Also >>>>>>>>>>>>>>>>> many runners >>>>>>>>>>>>>>>>> might not expose some of the internal scheduling >>>>>>>>>>>>>>>>> characteristics so we >>>>>>>>>>>>>>>>> should not expect scheduling characteristics for runner >>>>>>>>>>>>>>>>> integration. >>>>>>>>>>>>>>>>> Moreover scheduling characteristics can change based on >>>>>>>>>>>>>>>>> pipeline type, >>>>>>>>>>>>>>>>> infrastructure, available resource etc. So I am a bit >>>>>>>>>>>>>>>>> hesitant to add >>>>>>>>>>>>>>>>> runner scheduling specifics for runner integration. >>>>>>>>>>>>>>>>> A good balance between SDKHarness complexity and Runner >>>>>>>>>>>>>>>>> integration can be helpful in easier adoption. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Ankur >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Finding a good balance is indeed the art of portability, >>>>>>>>>>>>>>>>>> because the range of capability (and assumptions) on both >>>>>>>>>>>>>>>>>> sides is wide. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> It was originally the idea to allow the SDK harness to be >>>>>>>>>>>>>>>>>> an extremely simple bundle executer (specifically, >>>>>>>>>>>>>>>>>> single-threaded >>>>>>>>>>>>>>>>>> execution one instruction at a time) however inefficient -- >>>>>>>>>>>>>>>>>> a more >>>>>>>>>>>>>>>>>> sophisticated SDK harness would support more features and be >>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>> efficient. For the issue described here, it seems >>>>>>>>>>>>>>>>>> problematic to me to send >>>>>>>>>>>>>>>>>> non-executable bundles to the SDK harness under the >>>>>>>>>>>>>>>>>> expectation that the >>>>>>>>>>>>>>>>>> SDK harness will concurrently work its way deeply enough >>>>>>>>>>>>>>>>>> down the >>>>>>>>>>>>>>>>>> instruction queue to unblock itself. That would be an >>>>>>>>>>>>>>>>>> extremely subtle >>>>>>>>>>>>>>>>>> requirement for SDK authors and one practical question >>>>>>>>>>>>>>>>>> becomes: what should an SDK do with a bundle instruction >>>>>>>>>>>>>>>>>> that it doesn't >>>>>>>>>>>>>>>>>> have capacity to execute? If a runner needs to make such >>>>>>>>>>>>>>>>>> assumptions, I think that information should probably rather >>>>>>>>>>>>>>>>>> be explicit >>>>>>>>>>>>>>>>>> along the lines of proposal 1 -- i.e., some kind of >>>>>>>>>>>>>>>>>> negotiation between >>>>>>>>>>>>>>>>>> resources allotted to the SDK harness (a preliminary variant >>>>>>>>>>>>>>>>>> are in the >>>>>>>>>>>>>>>>>> provisioning api) and what the SDK harness in return can do >>>>>>>>>>>>>>>>>> (and a valid >>>>>>>>>>>>>>>>>> answer might be: 1 bundle at a time irrespectively of >>>>>>>>>>>>>>>>>> resources given) or a >>>>>>>>>>>>>>>>>> per-bundle special "overloaded" error response. For other >>>>>>>>>>>>>>>>>> aspects, such as >>>>>>>>>>>>>>>>>> side input readiness, the runner handles that complexity and >>>>>>>>>>>>>>>>>> the overall >>>>>>>>>>>>>>>>>> bias has generally been to move complexity to the runner >>>>>>>>>>>>>>>>>> side. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The SDK harness and initialization overhead is entirely >>>>>>>>>>>>>>>>>> SDK, job type and even pipeline specific. A docker container >>>>>>>>>>>>>>>>>> is also just a >>>>>>>>>>>>>>>>>> process, btw, and doesn't inherently carry much overhead. >>>>>>>>>>>>>>>>>> That said, on a >>>>>>>>>>>>>>>>>> single host, a static docker configuration is generally a >>>>>>>>>>>>>>>>>> lot simpler to >>>>>>>>>>>>>>>>>> work with. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Henning >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> It is good to see this discussed! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I think there needs to be a good balance between the SDK >>>>>>>>>>>>>>>>>>> harness capabilities/complexity and responsibilities. >>>>>>>>>>>>>>>>>>> Additionally the user >>>>>>>>>>>>>>>>>>> will need to be able to adjust the runner behavior, since >>>>>>>>>>>>>>>>>>> the type of >>>>>>>>>>>>>>>>>>> workload executed in the harness also is a factor. >>>>>>>>>>>>>>>>>>> Elsewhere we already >>>>>>>>>>>>>>>>>>> discussed that the current assumption of a single SDK >>>>>>>>>>>>>>>>>>> harness instance per >>>>>>>>>>>>>>>>>>> Flink task manager brings problems with it and that there >>>>>>>>>>>>>>>>>>> needs to be more >>>>>>>>>>>>>>>>>>> than one way how the runner can spin up SDK harnesses. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> There was the concern that instantiation if multiple SDK >>>>>>>>>>>>>>>>>>> harnesses per TM host is expensive (resource usage, >>>>>>>>>>>>>>>>>>> initialization time >>>>>>>>>>>>>>>>>>> etc.). That may hold true for a specific scenario, such as >>>>>>>>>>>>>>>>>>> batch workloads >>>>>>>>>>>>>>>>>>> and the use of Docker containers. But it may look totally >>>>>>>>>>>>>>>>>>> different for a >>>>>>>>>>>>>>>>>>> streaming topology or when SDK harness is just a process on >>>>>>>>>>>>>>>>>>> the same host. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> SDK harnesses were always responsible for executing all >>>>>>>>>>>>>>>>>>>> work given to it concurrently. Runners have been >>>>>>>>>>>>>>>>>>>> responsible for choosing >>>>>>>>>>>>>>>>>>>> how much work to give to the SDK harness in such a way >>>>>>>>>>>>>>>>>>>> that best utilizes >>>>>>>>>>>>>>>>>>>> the SDK harness. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I understand that multithreading in python is >>>>>>>>>>>>>>>>>>>> inefficient due to the global interpreter lock, it would >>>>>>>>>>>>>>>>>>>> be upto the runner >>>>>>>>>>>>>>>>>>>> in this case to make sure that the amount of work it gives >>>>>>>>>>>>>>>>>>>> to each SDK >>>>>>>>>>>>>>>>>>>> harness best utilizes it while spinning up an appropriate >>>>>>>>>>>>>>>>>>>> number of SDK >>>>>>>>>>>>>>>>>>>> harnesses. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels < >>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Ankur, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks for looking into this problem. The cause seems >>>>>>>>>>>>>>>>>>>>> to be Flink's >>>>>>>>>>>>>>>>>>>>> pipelined execution mode. It runs multiple tasks in >>>>>>>>>>>>>>>>>>>>> one task slot and >>>>>>>>>>>>>>>>>>>>> produces a deadlock when the pipelined operators >>>>>>>>>>>>>>>>>>>>> schedule the SDK >>>>>>>>>>>>>>>>>>>>> harness DoFns in non-topological order. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The problem would be resolved if we scheduled the >>>>>>>>>>>>>>>>>>>>> tasks in topological >>>>>>>>>>>>>>>>>>>>> order. Doing that is not easy because they run in >>>>>>>>>>>>>>>>>>>>> separate Flink >>>>>>>>>>>>>>>>>>>>> operators and the SDK Harness would have to have >>>>>>>>>>>>>>>>>>>>> insight into the >>>>>>>>>>>>>>>>>>>>> execution graph (which is not desirable). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The easiest method, which you proposed in 1) is to >>>>>>>>>>>>>>>>>>>>> ensure that the >>>>>>>>>>>>>>>>>>>>> number of threads in the SDK harness matches the >>>>>>>>>>>>>>>>>>>>> number of >>>>>>>>>>>>>>>>>>>>> ExecutableStage DoFn operators. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The approach in 2) is what Flink does as well. It >>>>>>>>>>>>>>>>>>>>> glues together >>>>>>>>>>>>>>>>>>>>> horizontal parts of the execution graph, also in >>>>>>>>>>>>>>>>>>>>> multiple threads. So I >>>>>>>>>>>>>>>>>>>>> agree with your proposed solution. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>> Max >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote: >>>>>>>>>>>>>>>>>>>>> > Hi, >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > tl;dr Dead Lock in task execution caused by limited >>>>>>>>>>>>>>>>>>>>> task parallelism on >>>>>>>>>>>>>>>>>>>>> > SDKHarness. >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > *Setup:* >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > * Job type: /*Beam Portable Python Batch*/ Job on >>>>>>>>>>>>>>>>>>>>> Flink standalone >>>>>>>>>>>>>>>>>>>>> > cluster. >>>>>>>>>>>>>>>>>>>>> > * Only a single job is scheduled on the cluster. >>>>>>>>>>>>>>>>>>>>> > * Everything is running on a single machine with >>>>>>>>>>>>>>>>>>>>> single Flink task >>>>>>>>>>>>>>>>>>>>> > manager. >>>>>>>>>>>>>>>>>>>>> > * Flink Task Manager Slots is 1. >>>>>>>>>>>>>>>>>>>>> > * Flink Parallelism is 1. >>>>>>>>>>>>>>>>>>>>> > * Python SDKHarness has 1 thread. >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > *Example pipeline:* >>>>>>>>>>>>>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > *Issue:* >>>>>>>>>>>>>>>>>>>>> > With multi stage job, Flink schedule different >>>>>>>>>>>>>>>>>>>>> dependent sub tasks >>>>>>>>>>>>>>>>>>>>> > concurrently on Flink worker as long as it can get >>>>>>>>>>>>>>>>>>>>> slots. Each map tasks >>>>>>>>>>>>>>>>>>>>> > are then executed on SDKHarness. >>>>>>>>>>>>>>>>>>>>> > Its possible that MapB gets to SDKHarness before >>>>>>>>>>>>>>>>>>>>> MapA and hence gets >>>>>>>>>>>>>>>>>>>>> > into the execution queue before MapA. Because we >>>>>>>>>>>>>>>>>>>>> only have 1 execution >>>>>>>>>>>>>>>>>>>>> > thread on SDKHarness, MapA will never get a chance >>>>>>>>>>>>>>>>>>>>> to execute as MapB >>>>>>>>>>>>>>>>>>>>> > will never release the execution thread. MapB will >>>>>>>>>>>>>>>>>>>>> wait for input from >>>>>>>>>>>>>>>>>>>>> > MapA. This gets us to a dead lock in a simple >>>>>>>>>>>>>>>>>>>>> pipeline. >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > *Mitigation:* >>>>>>>>>>>>>>>>>>>>> > Set worker_count in pipeline options more than the >>>>>>>>>>>>>>>>>>>>> expected sub tasks >>>>>>>>>>>>>>>>>>>>> > in pipeline. >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > *Proposal:* >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > 1. We can get the maximum concurrency from the >>>>>>>>>>>>>>>>>>>>> runner and make sure >>>>>>>>>>>>>>>>>>>>> > that we have more threads than max concurrency. >>>>>>>>>>>>>>>>>>>>> This approach >>>>>>>>>>>>>>>>>>>>> > assumes that Beam has insight into runner >>>>>>>>>>>>>>>>>>>>> execution plan and can >>>>>>>>>>>>>>>>>>>>> > make decision based on it. >>>>>>>>>>>>>>>>>>>>> > 2. We dynamically create thread and cache them with >>>>>>>>>>>>>>>>>>>>> a high upper bound >>>>>>>>>>>>>>>>>>>>> > in SDKHarness. We can warn if we are hitting the >>>>>>>>>>>>>>>>>>>>> upper bound of >>>>>>>>>>>>>>>>>>>>> > threads. This approach assumes that runner does >>>>>>>>>>>>>>>>>>>>> a good job of >>>>>>>>>>>>>>>>>>>>> > scheduling and will distribute tasks more or >>>>>>>>>>>>>>>>>>>>> less evenly. >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > We expect good scheduling from runners so I prefer >>>>>>>>>>>>>>>>>>>>> approach 2. It is >>>>>>>>>>>>>>>>>>>>> > simpler to implement and the implementation is not >>>>>>>>>>>>>>>>>>>>> runner specific. This >>>>>>>>>>>>>>>>>>>>> > approach better utilize resource as it creates only >>>>>>>>>>>>>>>>>>>>> as many threads as >>>>>>>>>>>>>>>>>>>>> > needed instead of the peak thread requirement. >>>>>>>>>>>>>>>>>>>>> > And last but not the least, it gives runner control >>>>>>>>>>>>>>>>>>>>> over managing truly >>>>>>>>>>>>>>>>>>>>> > active tasks. >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > Please let me know if I am missing something and >>>>>>>>>>>>>>>>>>>>> your thoughts on the >>>>>>>>>>>>>>>>>>>>> > approach. >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > Thanks, >>>>>>>>>>>>>>>>>>>>> > Ankur >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>
