I believe in practice SDK harnesses will fall into one of two capabilities, can process effectively an infinite amount of bundles in parallel or can only process a single bundle at a time.
I believe it is more difficult for a runner to handle the latter case well and to perform all the environment management that would make that efficient. It may be inefficient for an SDK but I do believe it should be able to say that I'm not great at anything more then a single bundle at a time but utilizing this information by a runner should be optional. On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <[email protected]> wrote: > To recap the discussion it seems that we have come-up with following point. > SDKHarness Management and initialization. > > 1. Runner completely own the work assignment to SDKHarness. > 2. Runner should know the capabilities and capacity of SDKHarness and > should assign work accordingly. > 3. Spinning up of SDKHarness is runner's responsibility and it can be > done statically (a fixed pre configured number of SDKHarness) or > dynamically or based on certain other configuration/logic which runner > choose. > > SDKHarness Expectation. This is more in question and we should outline the > responsibility of SDKHarness. > > 1. SDKHarness should publish how many concurrent tasks it can execute. > 2. SDKHarness should start executing all the tasks items assigned in > parallel in a timely manner or fail task. > > Also to add to simplification side. I think for better adoption, we should > have simple SDKHarness as well as simple Runner integration to encourage > integration with more runner. Also many runners might not expose some of > the internal scheduling characteristics so we should not expect scheduling > characteristics for runner integration. Moreover scheduling characteristics > can change based on pipeline type, infrastructure, available resource etc. > So I am a bit hesitant to add runner scheduling specifics for runner > integration. > A good balance between SDKHarness complexity and Runner integration can be > helpful in easier adoption. > > Thanks, > Ankur > > On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde <[email protected]> wrote: > >> Finding a good balance is indeed the art of portability, because the >> range of capability (and assumptions) on both sides is wide. >> >> It was originally the idea to allow the SDK harness to be an extremely >> simple bundle executer (specifically, single-threaded execution one >> instruction at a time) however inefficient -- a more sophisticated SDK >> harness would support more features and be more efficient. For the issue >> described here, it seems problematic to me to send non-executable bundles >> to the SDK harness under the expectation that the SDK harness will >> concurrently work its way deeply enough down the instruction queue to >> unblock itself. That would be an extremely subtle requirement for SDK >> authors and one practical question becomes: what should an SDK do with a >> bundle instruction that it doesn't have capacity to execute? If a runner >> needs to make such assumptions, I think that information should probably >> rather be explicit along the lines of proposal 1 -- i.e., some kind of >> negotiation between resources allotted to the SDK harness (a preliminary >> variant are in the provisioning api) and what the SDK harness in return can >> do (and a valid answer might be: 1 bundle at a time irrespectively of >> resources given) or a per-bundle special "overloaded" error response. For >> other aspects, such as side input readiness, the runner handles that >> complexity and the overall bias has generally been to move complexity to >> the runner side. >> >> The SDK harness and initialization overhead is entirely SDK, job type and >> even pipeline specific. A docker container is also just a process, btw, and >> doesn't inherently carry much overhead. That said, on a single host, a >> static docker configuration is generally a lot simpler to work with. >> >> Henning >> >> >> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <[email protected]> wrote: >> >>> It is good to see this discussed! >>> >>> I think there needs to be a good balance between the SDK harness >>> capabilities/complexity and responsibilities. Additionally the user will >>> need to be able to adjust the runner behavior, since the type of workload >>> executed in the harness also is a factor. Elsewhere we already discussed >>> that the current assumption of a single SDK harness instance per Flink task >>> manager brings problems with it and that there needs to be more than one >>> way how the runner can spin up SDK harnesses. >>> >>> There was the concern that instantiation if multiple SDK harnesses per >>> TM host is expensive (resource usage, initialization time etc.). That may >>> hold true for a specific scenario, such as batch workloads and the use of >>> Docker containers. But it may look totally different for a streaming >>> topology or when SDK harness is just a process on the same host. >>> >>> Thanks, >>> Thomas >>> >>> >>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <[email protected]> wrote: >>> >>>> SDK harnesses were always responsible for executing all work given to >>>> it concurrently. Runners have been responsible for choosing how much work >>>> to give to the SDK harness in such a way that best utilizes the SDK >>>> harness. >>>> >>>> I understand that multithreading in python is inefficient due to the >>>> global interpreter lock, it would be upto the runner in this case to make >>>> sure that the amount of work it gives to each SDK harness best utilizes it >>>> while spinning up an appropriate number of SDK harnesses. >>>> >>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels <[email protected]> >>>> wrote: >>>> >>>>> Hi Ankur, >>>>> >>>>> Thanks for looking into this problem. The cause seems to be Flink's >>>>> pipelined execution mode. It runs multiple tasks in one task slot and >>>>> produces a deadlock when the pipelined operators schedule the SDK >>>>> harness DoFns in non-topological order. >>>>> >>>>> The problem would be resolved if we scheduled the tasks in topological >>>>> order. Doing that is not easy because they run in separate Flink >>>>> operators and the SDK Harness would have to have insight into the >>>>> execution graph (which is not desirable). >>>>> >>>>> The easiest method, which you proposed in 1) is to ensure that the >>>>> number of threads in the SDK harness matches the number of >>>>> ExecutableStage DoFn operators. >>>>> >>>>> The approach in 2) is what Flink does as well. It glues together >>>>> horizontal parts of the execution graph, also in multiple threads. So I >>>>> agree with your proposed solution. >>>>> >>>>> Best, >>>>> Max >>>>> >>>>> On 17.08.18 03:10, Ankur Goenka wrote: >>>>> > Hi, >>>>> > >>>>> > tl;dr Dead Lock in task execution caused by limited task parallelism >>>>> on >>>>> > SDKHarness. >>>>> > >>>>> > *Setup:* >>>>> > >>>>> > * Job type: /*Beam Portable Python Batch*/ Job on Flink standalone >>>>> > cluster. >>>>> > * Only a single job is scheduled on the cluster. >>>>> > * Everything is running on a single machine with single Flink task >>>>> > manager. >>>>> > * Flink Task Manager Slots is 1. >>>>> > * Flink Parallelism is 1. >>>>> > * Python SDKHarness has 1 thread. >>>>> > >>>>> > *Example pipeline:* >>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink >>>>> > >>>>> > *Issue:* >>>>> > With multi stage job, Flink schedule different dependent sub tasks >>>>> > concurrently on Flink worker as long as it can get slots. Each map >>>>> tasks >>>>> > are then executed on SDKHarness. >>>>> > Its possible that MapB gets to SDKHarness before MapA and hence gets >>>>> > into the execution queue before MapA. Because we only have 1 >>>>> execution >>>>> > thread on SDKHarness, MapA will never get a chance to execute as MapB >>>>> > will never release the execution thread. MapB will wait for input >>>>> from >>>>> > MapA. This gets us to a dead lock in a simple pipeline. >>>>> > >>>>> > *Mitigation:* >>>>> > Set worker_count in pipeline options more than the expected sub tasks >>>>> > in pipeline. >>>>> > >>>>> > *Proposal:* >>>>> > >>>>> > 1. We can get the maximum concurrency from the runner and make sure >>>>> > that we have more threads than max concurrency. This approach >>>>> > assumes that Beam has insight into runner execution plan and can >>>>> > make decision based on it. >>>>> > 2. We dynamically create thread and cache them with a high upper >>>>> bound >>>>> > in SDKHarness. We can warn if we are hitting the upper bound of >>>>> > threads. This approach assumes that runner does a good job of >>>>> > scheduling and will distribute tasks more or less evenly. >>>>> > >>>>> > We expect good scheduling from runners so I prefer approach 2. It is >>>>> > simpler to implement and the implementation is not runner specific. >>>>> This >>>>> > approach better utilize resource as it creates only as many threads >>>>> as >>>>> > needed instead of the peak thread requirement. >>>>> > And last but not the least, it gives runner control over managing >>>>> truly >>>>> > active tasks. >>>>> > >>>>> > Please let me know if I am missing something and your thoughts on the >>>>> > approach. >>>>> > >>>>> > Thanks, >>>>> > Ankur >>>>> > >>>>> >>>>
