Henning, can you clarify by what you mean with send non-executable bundles to the SDK harness and how it is useful for Flink?
On Tue, Aug 21, 2018 at 2:01 PM Henning Rohde <[email protected]> wrote: > I think it will be useful to the runner to know upfront what the > fundamental threading capabilities are for the SDK harness (say, "fixed", > "linear", "dynamic", ..) so that the runner can upfront make a good static > decision on #harnesses and how many resources they should each have. It's > wasteful to give the Foo SDK a whole many-core machine with TBs of memory, > if it can only support a single bundle at a time. I think this is also in > line with what Thomas and Luke are suggesting. > > However, it still seems to me to be a semantically problematic idea to > send non-executable bundles to the SDK harness. I understand it's useful > for Flink, but is that really the best path forward? > > > > On Mon, Aug 20, 2018 at 5:44 PM Ankur Goenka <[email protected]> wrote: > >> That's right. >> To add to it. We added multi threading to python streaming as a single >> thread is sub optimal for streaming use case. >> Shall we move towards a conclusion on the SDK bundle processing upper >> bound? >> >> On Mon, Aug 20, 2018 at 1:54 PM Lukasz Cwik <[email protected]> wrote: >> >>> Ankur, I can see where you are going with your argument. I believe there >>> is certain information which is static and won't change at pipeline >>> creation time (such as Python SDK is most efficient doing one bundle at a >>> time) and some stuff which is best at runtime, like memory and CPU limits, >>> worker count. >>> >>> On Mon, Aug 20, 2018 at 1:47 PM Ankur Goenka <[email protected]> wrote: >>> >>>> I would prefer to to keep it dynamic as it can be changed by the >>>> infrastructure or the pipeline author. >>>> Like in case of Python, number of concurrent bundle can be changed by >>>> setting pipeline option worker_count. And for Java it can be computed based >>>> on the cpus on the machine. >>>> >>>> For Flink runner, we can use the worker_count parameter for now to >>>> increase the parallelism. And we can have 1 container for each mapPartition >>>> task on Flink while reusing containers as container creation is expensive >>>> especially for Python where it installs a bunch of dependencies. There is 1 >>>> caveat though. I have seen machine crashes because of too many simultaneous >>>> container creation. We can rate limit container creation in the code to >>>> avoid this. >>>> >>>> Thanks, >>>> Ankur >>>> >>>> On Mon, Aug 20, 2018 at 9:20 AM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> +1 on making the resources part of a proto. Based upon what Henning >>>>> linked to, the provisioning API seems like an appropriate place to provide >>>>> this information. >>>>> >>>>> Thomas, I believe the environment proto is the best place to add >>>>> information that a runner may want to know about upfront during pipeline >>>>> pipeline creation. I wouldn't stick this into PipelineOptions for the long >>>>> term. >>>>> If you have time to capture these thoughts and update the environment >>>>> proto, I would suggest going down that path. Otherwise anything short term >>>>> like PipelineOptions will do. >>>>> >>>>> On Sun, Aug 19, 2018 at 5:41 PM Thomas Weise <[email protected]> wrote: >>>>> >>>>>> For SDKs where the upper limit is constant and known upfront, why not >>>>>> communicate this along with the other harness resource info as part of >>>>>> the >>>>>> job submission? >>>>>> >>>>>> Regarding use of GRPC headers: Why not make this explicit in the >>>>>> proto instead? >>>>>> >>>>>> WRT runner dictating resource constraints: The runner actually may >>>>>> also not have that information. It would need to be supplied as part of >>>>>> the >>>>>> pipeline options? The cluster resource manager needs to allocate >>>>>> resources >>>>>> for both, the runner and the SDK harness(es). >>>>>> >>>>>> Finally, what can be done to unblock the Flink runner / Python until >>>>>> solution discussed here is in place? An extra runner option for SDK >>>>>> singleton on/off? >>>>>> >>>>>> >>>>>> On Sat, Aug 18, 2018 at 1:34 AM Ankur Goenka <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Sounds good to me. >>>>>>> GRPC Header of the control channel seems to be a good place to add >>>>>>> upper bound information. >>>>>>> Added jiras: >>>>>>> https://issues.apache.org/jira/browse/BEAM-5166 >>>>>>> https://issues.apache.org/jira/browse/BEAM-5167 >>>>>>> >>>>>>> On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Regarding resources: the runner can currently dictate the >>>>>>>> mem/cpu/disk resources that the harness is allowed to use via the >>>>>>>> provisioning api. The SDK harness need not -- and should not -- >>>>>>>> speculate >>>>>>>> on what else might be running on the machine: >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/beam/blob/0e14965707b5d48a3de7fa69f09d88ef0aa48c09/model/fn-execution/src/main/proto/beam_provision_api.proto#L69 >>>>>>>> >>>>>>>> A realistic startup-time computation in the SDK harness would be >>>>>>>> something simple like: max(1, min(cpu*100, mem_mb/10)) say, and use >>>>>>>> that at >>>>>>>> most number of threads. Or just hardcode to 300. Or a user-provided >>>>>>>> value. >>>>>>>> Whatever the value is the maximum number of bundles in flight allowed >>>>>>>> at >>>>>>>> any given time and needs to be communicated to the runner via some >>>>>>>> message. >>>>>>>> Anything beyond would be rejected (but this shouldn't happen, because >>>>>>>> the >>>>>>>> runner should respect that number). >>>>>>>> >>>>>>>> A dynamic computation would use the same limits from the SDK, but >>>>>>>> take into account its own resource usage (incl. the usage by running >>>>>>>> bundles). >>>>>>>> >>>>>>>> On Fri, Aug 17, 2018 at 6:20 PM Ankur Goenka <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I am thinking upper bound to be more on the lines of theocratical >>>>>>>>> upper limit or any other static high value beyond which the SDK will >>>>>>>>> reject >>>>>>>>> bundle verbosely. The idea is that SDK will not keep bundles in queue >>>>>>>>> while >>>>>>>>> waiting on current bundles to finish. It will simply reject any >>>>>>>>> additional >>>>>>>>> bundle. >>>>>>>>> Beyond this I don't have a good answer to dynamic upper bound. As >>>>>>>>> SDK does not have the complete picture of processes on the machine >>>>>>>>> with >>>>>>>>> which it share resources, resources might not be a good proxy for >>>>>>>>> upper >>>>>>>>> bound from the SDK point of view. >>>>>>>>> >>>>>>>>> On Fri, Aug 17, 2018 at 6:01 PM Lukasz Cwik <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Ankur, how would you expect an SDK to compute a realistic upper >>>>>>>>>> bound (upfront or during pipeline computation)? >>>>>>>>>> >>>>>>>>>> First thought that came to my mind was that the SDK would provide >>>>>>>>>> CPU/memory/... resourcing information and the runner making a >>>>>>>>>> judgement >>>>>>>>>> call as to whether it should ask the SDK to do more work or less but >>>>>>>>>> its >>>>>>>>>> not an explicit don't do more then X bundles in parallel. >>>>>>>>>> >>>>>>>>>> On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Makes sense. Having exposed upper bound on concurrency with >>>>>>>>>>> optimum concurrency can give a good balance. This is good >>>>>>>>>>> information to >>>>>>>>>>> expose while keeping the requirements from the SDK simple. SDK can >>>>>>>>>>> publish >>>>>>>>>>> 1 as the optimum concurrency and upper bound to keep things simple. >>>>>>>>>>> >>>>>>>>>>> Runner introspection of upper bound on concurrency is important >>>>>>>>>>> for correctness while introspection of optimum concurrency is >>>>>>>>>>> important for >>>>>>>>>>> efficiency. This separates efficiency and correctness requirements. >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> I agree with Luke's observation, with the caveat that "infinite >>>>>>>>>>>> amount of bundles in parallel" is limited by the available >>>>>>>>>>>> resources. For >>>>>>>>>>>> example, the Go SDK harness will accept an arbitrary amount of >>>>>>>>>>>> parallel >>>>>>>>>>>> work, but too much work will cause either excessive GC pressure >>>>>>>>>>>> with >>>>>>>>>>>> crippling slowness or an outright OOM. Unless it's always 1, a >>>>>>>>>>>> reasonable >>>>>>>>>>>> upper bound will either have to be provided by the user or >>>>>>>>>>>> computed from >>>>>>>>>>>> the mem/cpu resources given. Of course, as some bundles takes more >>>>>>>>>>>> resources than others, so any static value will be an estimate or >>>>>>>>>>>> ignore >>>>>>>>>>>> resource limits. >>>>>>>>>>>> >>>>>>>>>>>> That said, I do not like that an "efficiency" aspect becomes a >>>>>>>>>>>> subtle requirement for correctness due to Flink internals. I fear >>>>>>>>>>>> that road >>>>>>>>>>>> leads to trouble. >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> The later case of having a of supporting single bundle >>>>>>>>>>>>> execution at a time on SDK and runner not using this flag is >>>>>>>>>>>>> exactly the >>>>>>>>>>>>> reason we got into the Dead Lock here. >>>>>>>>>>>>> I agree with exposing SDK optimum concurrency level ( 1 in >>>>>>>>>>>>> later case ) and let runner decide to use it or not. But at the >>>>>>>>>>>>> same time >>>>>>>>>>>>> expect SDK to handle infinite amount of bundles even if its not >>>>>>>>>>>>> efficient. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Ankur >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik <[email protected]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I believe in practice SDK harnesses will fall into one of two >>>>>>>>>>>>>> capabilities, can process effectively an infinite amount of >>>>>>>>>>>>>> bundles in >>>>>>>>>>>>>> parallel or can only process a single bundle at a time. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I believe it is more difficult for a runner to handle the >>>>>>>>>>>>>> latter case well and to perform all the environment management >>>>>>>>>>>>>> that would >>>>>>>>>>>>>> make that efficient. It may be inefficient for an SDK but I do >>>>>>>>>>>>>> believe it >>>>>>>>>>>>>> should be able to say that I'm not great at anything more then a >>>>>>>>>>>>>> single >>>>>>>>>>>>>> bundle at a time but utilizing this information by a runner >>>>>>>>>>>>>> should be >>>>>>>>>>>>>> optional. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> To recap the discussion it seems that we have come-up with >>>>>>>>>>>>>>> following point. >>>>>>>>>>>>>>> SDKHarness Management and initialization. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. Runner completely own the work assignment to >>>>>>>>>>>>>>> SDKHarness. >>>>>>>>>>>>>>> 2. Runner should know the capabilities and capacity of >>>>>>>>>>>>>>> SDKHarness and should assign work accordingly. >>>>>>>>>>>>>>> 3. Spinning up of SDKHarness is runner's responsibility >>>>>>>>>>>>>>> and it can be done statically (a fixed pre configured number >>>>>>>>>>>>>>> of SDKHarness) >>>>>>>>>>>>>>> or dynamically or based on certain other configuration/logic >>>>>>>>>>>>>>> which runner >>>>>>>>>>>>>>> choose. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> SDKHarness Expectation. This is more in question and we >>>>>>>>>>>>>>> should outline the responsibility of SDKHarness. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. SDKHarness should publish how many concurrent tasks >>>>>>>>>>>>>>> it can execute. >>>>>>>>>>>>>>> 2. SDKHarness should start executing all the tasks items >>>>>>>>>>>>>>> assigned in parallel in a timely manner or fail task. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Also to add to simplification side. I think for better >>>>>>>>>>>>>>> adoption, we should have simple SDKHarness as well as simple >>>>>>>>>>>>>>> Runner >>>>>>>>>>>>>>> integration to encourage integration with more runner. Also >>>>>>>>>>>>>>> many runners >>>>>>>>>>>>>>> might not expose some of the internal scheduling >>>>>>>>>>>>>>> characteristics so we >>>>>>>>>>>>>>> should not expect scheduling characteristics for runner >>>>>>>>>>>>>>> integration. >>>>>>>>>>>>>>> Moreover scheduling characteristics can change based on >>>>>>>>>>>>>>> pipeline type, >>>>>>>>>>>>>>> infrastructure, available resource etc. So I am a bit hesitant >>>>>>>>>>>>>>> to add >>>>>>>>>>>>>>> runner scheduling specifics for runner integration. >>>>>>>>>>>>>>> A good balance between SDKHarness complexity and Runner >>>>>>>>>>>>>>> integration can be helpful in easier adoption. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Ankur >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Finding a good balance is indeed the art of portability, >>>>>>>>>>>>>>>> because the range of capability (and assumptions) on both >>>>>>>>>>>>>>>> sides is wide. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> It was originally the idea to allow the SDK harness to be >>>>>>>>>>>>>>>> an extremely simple bundle executer (specifically, >>>>>>>>>>>>>>>> single-threaded >>>>>>>>>>>>>>>> execution one instruction at a time) however inefficient -- a >>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>> sophisticated SDK harness would support more features and be >>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>> efficient. For the issue described here, it seems problematic >>>>>>>>>>>>>>>> to me to send >>>>>>>>>>>>>>>> non-executable bundles to the SDK harness under the >>>>>>>>>>>>>>>> expectation that the >>>>>>>>>>>>>>>> SDK harness will concurrently work its way deeply enough down >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> instruction queue to unblock itself. That would be an >>>>>>>>>>>>>>>> extremely subtle >>>>>>>>>>>>>>>> requirement for SDK authors and one practical question >>>>>>>>>>>>>>>> becomes: what should an SDK do with a bundle instruction that >>>>>>>>>>>>>>>> it doesn't >>>>>>>>>>>>>>>> have capacity to execute? If a runner needs to make such >>>>>>>>>>>>>>>> assumptions, I think that information should probably rather >>>>>>>>>>>>>>>> be explicit >>>>>>>>>>>>>>>> along the lines of proposal 1 -- i.e., some kind of >>>>>>>>>>>>>>>> negotiation between >>>>>>>>>>>>>>>> resources allotted to the SDK harness (a preliminary variant >>>>>>>>>>>>>>>> are in the >>>>>>>>>>>>>>>> provisioning api) and what the SDK harness in return can do >>>>>>>>>>>>>>>> (and a valid >>>>>>>>>>>>>>>> answer might be: 1 bundle at a time irrespectively of >>>>>>>>>>>>>>>> resources given) or a >>>>>>>>>>>>>>>> per-bundle special "overloaded" error response. For other >>>>>>>>>>>>>>>> aspects, such as >>>>>>>>>>>>>>>> side input readiness, the runner handles that complexity and >>>>>>>>>>>>>>>> the overall >>>>>>>>>>>>>>>> bias has generally been to move complexity to the runner side. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The SDK harness and initialization overhead is entirely >>>>>>>>>>>>>>>> SDK, job type and even pipeline specific. A docker container >>>>>>>>>>>>>>>> is also just a >>>>>>>>>>>>>>>> process, btw, and doesn't inherently carry much overhead. That >>>>>>>>>>>>>>>> said, on a >>>>>>>>>>>>>>>> single host, a static docker configuration is generally a lot >>>>>>>>>>>>>>>> simpler to >>>>>>>>>>>>>>>> work with. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Henning >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It is good to see this discussed! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think there needs to be a good balance between the SDK >>>>>>>>>>>>>>>>> harness capabilities/complexity and responsibilities. >>>>>>>>>>>>>>>>> Additionally the user >>>>>>>>>>>>>>>>> will need to be able to adjust the runner behavior, since the >>>>>>>>>>>>>>>>> type of >>>>>>>>>>>>>>>>> workload executed in the harness also is a factor. Elsewhere >>>>>>>>>>>>>>>>> we already >>>>>>>>>>>>>>>>> discussed that the current assumption of a single SDK harness >>>>>>>>>>>>>>>>> instance per >>>>>>>>>>>>>>>>> Flink task manager brings problems with it and that there >>>>>>>>>>>>>>>>> needs to be more >>>>>>>>>>>>>>>>> than one way how the runner can spin up SDK harnesses. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> There was the concern that instantiation if multiple SDK >>>>>>>>>>>>>>>>> harnesses per TM host is expensive (resource usage, >>>>>>>>>>>>>>>>> initialization time >>>>>>>>>>>>>>>>> etc.). That may hold true for a specific scenario, such as >>>>>>>>>>>>>>>>> batch workloads >>>>>>>>>>>>>>>>> and the use of Docker containers. But it may look totally >>>>>>>>>>>>>>>>> different for a >>>>>>>>>>>>>>>>> streaming topology or when SDK harness is just a process on >>>>>>>>>>>>>>>>> the same host. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> SDK harnesses were always responsible for executing all >>>>>>>>>>>>>>>>>> work given to it concurrently. Runners have been responsible >>>>>>>>>>>>>>>>>> for choosing >>>>>>>>>>>>>>>>>> how much work to give to the SDK harness in such a way that >>>>>>>>>>>>>>>>>> best utilizes >>>>>>>>>>>>>>>>>> the SDK harness. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I understand that multithreading in python is inefficient >>>>>>>>>>>>>>>>>> due to the global interpreter lock, it would be upto the >>>>>>>>>>>>>>>>>> runner in this >>>>>>>>>>>>>>>>>> case to make sure that the amount of work it gives to each >>>>>>>>>>>>>>>>>> SDK harness best >>>>>>>>>>>>>>>>>> utilizes it while spinning up an appropriate number of SDK >>>>>>>>>>>>>>>>>> harnesses. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Ankur, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for looking into this problem. The cause seems to >>>>>>>>>>>>>>>>>>> be Flink's >>>>>>>>>>>>>>>>>>> pipelined execution mode. It runs multiple tasks in one >>>>>>>>>>>>>>>>>>> task slot and >>>>>>>>>>>>>>>>>>> produces a deadlock when the pipelined operators >>>>>>>>>>>>>>>>>>> schedule the SDK >>>>>>>>>>>>>>>>>>> harness DoFns in non-topological order. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The problem would be resolved if we scheduled the tasks >>>>>>>>>>>>>>>>>>> in topological >>>>>>>>>>>>>>>>>>> order. Doing that is not easy because they run in >>>>>>>>>>>>>>>>>>> separate Flink >>>>>>>>>>>>>>>>>>> operators and the SDK Harness would have to have insight >>>>>>>>>>>>>>>>>>> into the >>>>>>>>>>>>>>>>>>> execution graph (which is not desirable). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The easiest method, which you proposed in 1) is to >>>>>>>>>>>>>>>>>>> ensure that the >>>>>>>>>>>>>>>>>>> number of threads in the SDK harness matches the number >>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>> ExecutableStage DoFn operators. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The approach in 2) is what Flink does as well. It glues >>>>>>>>>>>>>>>>>>> together >>>>>>>>>>>>>>>>>>> horizontal parts of the execution graph, also in >>>>>>>>>>>>>>>>>>> multiple threads. So I >>>>>>>>>>>>>>>>>>> agree with your proposed solution. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>> Max >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote: >>>>>>>>>>>>>>>>>>> > Hi, >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > tl;dr Dead Lock in task execution caused by limited >>>>>>>>>>>>>>>>>>> task parallelism on >>>>>>>>>>>>>>>>>>> > SDKHarness. >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > *Setup:* >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > * Job type: /*Beam Portable Python Batch*/ Job on >>>>>>>>>>>>>>>>>>> Flink standalone >>>>>>>>>>>>>>>>>>> > cluster. >>>>>>>>>>>>>>>>>>> > * Only a single job is scheduled on the cluster. >>>>>>>>>>>>>>>>>>> > * Everything is running on a single machine with >>>>>>>>>>>>>>>>>>> single Flink task >>>>>>>>>>>>>>>>>>> > manager. >>>>>>>>>>>>>>>>>>> > * Flink Task Manager Slots is 1. >>>>>>>>>>>>>>>>>>> > * Flink Parallelism is 1. >>>>>>>>>>>>>>>>>>> > * Python SDKHarness has 1 thread. >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > *Example pipeline:* >>>>>>>>>>>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > *Issue:* >>>>>>>>>>>>>>>>>>> > With multi stage job, Flink schedule different >>>>>>>>>>>>>>>>>>> dependent sub tasks >>>>>>>>>>>>>>>>>>> > concurrently on Flink worker as long as it can get >>>>>>>>>>>>>>>>>>> slots. Each map tasks >>>>>>>>>>>>>>>>>>> > are then executed on SDKHarness. >>>>>>>>>>>>>>>>>>> > Its possible that MapB gets to SDKHarness before MapA >>>>>>>>>>>>>>>>>>> and hence gets >>>>>>>>>>>>>>>>>>> > into the execution queue before MapA. Because we only >>>>>>>>>>>>>>>>>>> have 1 execution >>>>>>>>>>>>>>>>>>> > thread on SDKHarness, MapA will never get a chance to >>>>>>>>>>>>>>>>>>> execute as MapB >>>>>>>>>>>>>>>>>>> > will never release the execution thread. MapB will >>>>>>>>>>>>>>>>>>> wait for input from >>>>>>>>>>>>>>>>>>> > MapA. This gets us to a dead lock in a simple pipeline. >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > *Mitigation:* >>>>>>>>>>>>>>>>>>> > Set worker_count in pipeline options more than the >>>>>>>>>>>>>>>>>>> expected sub tasks >>>>>>>>>>>>>>>>>>> > in pipeline. >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > *Proposal:* >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > 1. We can get the maximum concurrency from the runner >>>>>>>>>>>>>>>>>>> and make sure >>>>>>>>>>>>>>>>>>> > that we have more threads than max concurrency. >>>>>>>>>>>>>>>>>>> This approach >>>>>>>>>>>>>>>>>>> > assumes that Beam has insight into runner >>>>>>>>>>>>>>>>>>> execution plan and can >>>>>>>>>>>>>>>>>>> > make decision based on it. >>>>>>>>>>>>>>>>>>> > 2. We dynamically create thread and cache them with a >>>>>>>>>>>>>>>>>>> high upper bound >>>>>>>>>>>>>>>>>>> > in SDKHarness. We can warn if we are hitting the >>>>>>>>>>>>>>>>>>> upper bound of >>>>>>>>>>>>>>>>>>> > threads. This approach assumes that runner does a >>>>>>>>>>>>>>>>>>> good job of >>>>>>>>>>>>>>>>>>> > scheduling and will distribute tasks more or less >>>>>>>>>>>>>>>>>>> evenly. >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > We expect good scheduling from runners so I prefer >>>>>>>>>>>>>>>>>>> approach 2. It is >>>>>>>>>>>>>>>>>>> > simpler to implement and the implementation is not >>>>>>>>>>>>>>>>>>> runner specific. This >>>>>>>>>>>>>>>>>>> > approach better utilize resource as it creates only as >>>>>>>>>>>>>>>>>>> many threads as >>>>>>>>>>>>>>>>>>> > needed instead of the peak thread requirement. >>>>>>>>>>>>>>>>>>> > And last but not the least, it gives runner control >>>>>>>>>>>>>>>>>>> over managing truly >>>>>>>>>>>>>>>>>>> > active tasks. >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > Please let me know if I am missing something and your >>>>>>>>>>>>>>>>>>> thoughts on the >>>>>>>>>>>>>>>>>>> > approach. >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > Thanks, >>>>>>>>>>>>>>>>>>> > Ankur >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>
