I believe in practice SDK harnesses will fall into one of two capabilities,
can process effectively an infinite amount of bundles in parallel or can
only process a single bundle at a time.

I believe it is more difficult for a runner to handle the latter case well
and to perform all the environment management that would make that
efficient. It may be inefficient for an SDK but I do believe it should be
able to say that I'm not great at anything more then a single bundle at a
time but utilizing this information by a runner should be optional.



On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <[email protected]> wrote:

> To recap the discussion it seems that we have come-up with following point.
> SDKHarness Management and initialization.
>
>    1. Runner completely own the work assignment to SDKHarness.
>    2. Runner should know the capabilities and capacity of SDKHarness and
>    should assign work accordingly.
>    3. Spinning up of SDKHarness is runner's responsibility and it can be
>    done statically (a fixed pre configured number of SDKHarness) or
>    dynamically or based on certain other configuration/logic which runner
>    choose.
>
> SDKHarness Expectation. This is more in question and we should outline the
> responsibility of SDKHarness.
>
>    1. SDKHarness should publish how many concurrent tasks it can execute.
>    2. SDKHarness should start executing all the tasks items assigned in
>    parallel in a timely manner or fail task.
>
> Also to add to simplification side. I think for better adoption, we should
> have simple SDKHarness as well as simple Runner integration to encourage
> integration with more runner. Also many runners might not expose some of
> the internal scheduling characteristics so we should not expect scheduling
> characteristics for runner integration. Moreover scheduling characteristics
> can change based on pipeline type, infrastructure, available resource etc.
> So I am a bit hesitant to add runner scheduling specifics for runner
> integration.
> A good balance between SDKHarness complexity and Runner integration can be
> helpful in easier adoption.
>
> Thanks,
> Ankur
>
> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde <[email protected]> wrote:
>
>> Finding a good balance is indeed the art of portability, because the
>> range of capability (and assumptions) on both sides is wide.
>>
>> It was originally the idea to allow the SDK harness to be an extremely
>> simple bundle executer (specifically, single-threaded execution one
>> instruction at a time) however inefficient -- a more sophisticated SDK
>> harness would support more features and be more efficient. For the issue
>> described here, it seems problematic to me to send non-executable bundles
>> to the SDK harness under the expectation that the SDK harness will
>> concurrently work its way deeply enough down the instruction queue to
>> unblock itself. That would be an extremely subtle requirement for SDK
>> authors and one practical question becomes: what should an SDK do with a
>> bundle instruction that it doesn't have capacity to execute? If a runner
>> needs to make such assumptions, I think that information should probably
>> rather be explicit along the lines of proposal 1 -- i.e., some kind of
>> negotiation between resources allotted to the SDK harness (a preliminary
>> variant are in the provisioning api) and what the SDK harness in return can
>> do (and a valid answer might be: 1 bundle at a time irrespectively of
>> resources given) or a per-bundle special "overloaded" error response. For
>> other aspects, such as side input readiness, the runner handles that
>> complexity and the overall bias has generally been to move complexity to
>> the runner side.
>>
>> The SDK harness and initialization overhead is entirely SDK, job type and
>> even pipeline specific. A docker container is also just a process, btw, and
>> doesn't inherently carry much overhead. That said, on a single host, a
>> static docker configuration is generally a lot simpler to work with.
>>
>> Henning
>>
>>
>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <[email protected]> wrote:
>>
>>> It is good to see this discussed!
>>>
>>> I think there needs to be a good balance between the SDK harness
>>> capabilities/complexity and responsibilities. Additionally the user will
>>> need to be able to adjust the runner behavior, since the type of workload
>>> executed in the harness also is a factor. Elsewhere we already discussed
>>> that the current assumption of a single SDK harness instance per Flink task
>>> manager brings problems with it and that there needs to be more than one
>>> way how the runner can spin up SDK harnesses.
>>>
>>> There was the concern that instantiation if multiple SDK harnesses per
>>> TM host is expensive (resource usage, initialization time etc.). That may
>>> hold true for a specific scenario, such as batch workloads and the use of
>>> Docker containers. But it may look totally different for a streaming
>>> topology or when SDK harness is just a process on the same host.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <[email protected]> wrote:
>>>
>>>> SDK harnesses were always responsible for executing all work given to
>>>> it concurrently. Runners have been responsible for choosing how much work
>>>> to give to the SDK harness in such a way that best utilizes the SDK 
>>>> harness.
>>>>
>>>> I understand that multithreading in python is inefficient due to the
>>>> global interpreter lock, it would be upto the runner in this case to make
>>>> sure that the amount of work it gives to each SDK harness best utilizes it
>>>> while spinning up an appropriate number of SDK harnesses.
>>>>
>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Ankur,
>>>>>
>>>>> Thanks for looking into this problem. The cause seems to be Flink's
>>>>> pipelined execution mode. It runs multiple tasks in one task slot and
>>>>> produces a deadlock when the pipelined operators schedule the SDK
>>>>> harness DoFns in non-topological order.
>>>>>
>>>>> The problem would be resolved if we scheduled the tasks in topological
>>>>> order. Doing that is not easy because they run in separate Flink
>>>>> operators and the SDK Harness would have to have insight into the
>>>>> execution graph (which is not desirable).
>>>>>
>>>>> The easiest method, which you proposed in 1) is to ensure that the
>>>>> number of threads in the SDK harness matches the number of
>>>>> ExecutableStage DoFn operators.
>>>>>
>>>>> The approach in 2) is what Flink does as well. It glues together
>>>>> horizontal parts of the execution graph, also in multiple threads. So I
>>>>> agree with your proposed solution.
>>>>>
>>>>> Best,
>>>>> Max
>>>>>
>>>>> On 17.08.18 03:10, Ankur Goenka wrote:
>>>>> > Hi,
>>>>> >
>>>>> > tl;dr Dead Lock in task execution caused by limited task parallelism
>>>>> on
>>>>> > SDKHarness.
>>>>> >
>>>>> > *Setup:*
>>>>> >
>>>>> >   * Job type: /*Beam Portable Python Batch*/ Job on Flink standalone
>>>>> >     cluster.
>>>>> >   * Only a single job is scheduled on the cluster.
>>>>> >   * Everything is running on a single machine with single Flink task
>>>>> >     manager.
>>>>> >   * Flink Task Manager Slots is 1.
>>>>> >   * Flink Parallelism is 1.
>>>>> >   * Python SDKHarness has 1 thread.
>>>>> >
>>>>> > *Example pipeline:*
>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink
>>>>> >
>>>>> > *Issue:*
>>>>> > With multi stage job, Flink schedule different dependent sub tasks
>>>>> > concurrently on Flink worker as long as it can get slots. Each map
>>>>> tasks
>>>>> > are then executed on SDKHarness.
>>>>> > Its possible that MapB gets to SDKHarness before MapA and hence gets
>>>>> > into the execution queue before MapA. Because we only have 1
>>>>> execution
>>>>> > thread on SDKHarness, MapA will never get a chance to execute as MapB
>>>>> > will never release the execution thread. MapB will wait for input
>>>>> from
>>>>> > MapA. This gets us to a dead lock in a simple pipeline.
>>>>> >
>>>>> > *Mitigation:*
>>>>> > Set worker_count in pipeline options more than the expected sub tasks
>>>>> > in pipeline.
>>>>> >
>>>>> > *Proposal:*
>>>>> >
>>>>> >  1. We can get the maximum concurrency from the runner and make sure
>>>>> >     that we have more threads than max concurrency. This approach
>>>>> >     assumes that Beam has insight into runner execution plan and can
>>>>> >     make decision based on it.
>>>>> >  2. We dynamically create thread and cache them with a high upper
>>>>> bound
>>>>> >     in SDKHarness. We can warn if we are hitting the upper bound of
>>>>> >     threads. This approach assumes that runner does a good job of
>>>>> >     scheduling and will distribute tasks more or less evenly.
>>>>> >
>>>>> > We expect good scheduling from runners so I prefer approach 2. It is
>>>>> > simpler to implement and the implementation is not runner specific.
>>>>> This
>>>>> > approach better utilize resource as it creates only as many threads
>>>>> as
>>>>> > needed instead of the peak thread requirement.
>>>>> > And last but not the least, it gives runner control over managing
>>>>> truly
>>>>> > active tasks.
>>>>> >
>>>>> > Please let me know if I am missing something and your thoughts on the
>>>>> > approach.
>>>>> >
>>>>> > Thanks,
>>>>> > Ankur
>>>>> >
>>>>>
>>>>

Reply via email to