Henning, can you clarify by what you mean with send non-executable bundles
to the SDK harness and how it is useful for Flink?

On Tue, Aug 21, 2018 at 2:01 PM Henning Rohde <[email protected]> wrote:

> I think it will be useful to the runner to know upfront what the
> fundamental threading capabilities are for the SDK harness (say, "fixed",
> "linear", "dynamic", ..) so that the runner can upfront make a good static
> decision on #harnesses and how many resources they should each have. It's
> wasteful to give the Foo SDK a whole many-core machine with TBs of memory,
> if it can only support a single bundle at a time. I think this is also in
> line with what Thomas and Luke are suggesting.
>
> However, it still seems to me to be a semantically problematic idea to
> send non-executable bundles to the SDK harness. I understand it's useful
> for Flink, but is that really the best path forward?
>
>
>
> On Mon, Aug 20, 2018 at 5:44 PM Ankur Goenka <[email protected]> wrote:
>
>> That's right.
>> To add to it. We added multi threading to python streaming as a single
>> thread is sub optimal for streaming use case.
>> Shall we move towards a conclusion on the SDK bundle processing upper
>> bound?
>>
>> On Mon, Aug 20, 2018 at 1:54 PM Lukasz Cwik <[email protected]> wrote:
>>
>>> Ankur, I can see where you are going with your argument. I believe there
>>> is certain information which is static and won't change at pipeline
>>> creation time (such as Python SDK is most efficient doing one bundle at a
>>> time) and some stuff which is best at runtime, like memory and CPU limits,
>>> worker count.
>>>
>>> On Mon, Aug 20, 2018 at 1:47 PM Ankur Goenka <[email protected]> wrote:
>>>
>>>> I would prefer to to keep it dynamic as it can be changed by the
>>>> infrastructure or the pipeline author.
>>>> Like in case of Python, number of concurrent bundle can be changed by
>>>> setting pipeline option worker_count. And for Java it can be computed based
>>>> on the cpus on the machine.
>>>>
>>>> For Flink runner, we can use the worker_count parameter for now to
>>>> increase the parallelism. And we can have 1 container for each mapPartition
>>>> task on Flink while reusing containers as container creation is expensive
>>>> especially for Python where it installs a bunch of dependencies. There is 1
>>>> caveat though. I have seen machine crashes because of too many simultaneous
>>>> container creation. We can rate limit container creation in the code to
>>>> avoid this.
>>>>
>>>> Thanks,
>>>> Ankur
>>>>
>>>> On Mon, Aug 20, 2018 at 9:20 AM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> +1 on making the resources part of a proto. Based upon what Henning
>>>>> linked to, the provisioning API seems like an appropriate place to provide
>>>>> this information.
>>>>>
>>>>> Thomas, I believe the environment proto is the best place to add
>>>>> information that a runner may want to know about upfront during pipeline
>>>>> pipeline creation. I wouldn't stick this into PipelineOptions for the long
>>>>> term.
>>>>> If you have time to capture these thoughts and update the environment
>>>>> proto, I would suggest going down that path. Otherwise anything short term
>>>>> like PipelineOptions will do.
>>>>>
>>>>> On Sun, Aug 19, 2018 at 5:41 PM Thomas Weise <[email protected]> wrote:
>>>>>
>>>>>> For SDKs where the upper limit is constant and known upfront, why not
>>>>>> communicate this along with the other harness resource info as part of 
>>>>>> the
>>>>>> job submission?
>>>>>>
>>>>>> Regarding use of GRPC headers: Why not make this explicit in the
>>>>>> proto instead?
>>>>>>
>>>>>> WRT runner dictating resource constraints: The runner actually may
>>>>>> also not have that information. It would need to be supplied as part of 
>>>>>> the
>>>>>> pipeline options? The cluster resource manager needs to allocate 
>>>>>> resources
>>>>>> for both, the runner and the SDK harness(es).
>>>>>>
>>>>>> Finally, what can be done to unblock the Flink runner / Python until
>>>>>> solution discussed here is in place? An extra runner option for SDK
>>>>>> singleton on/off?
>>>>>>
>>>>>>
>>>>>> On Sat, Aug 18, 2018 at 1:34 AM Ankur Goenka <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Sounds good to me.
>>>>>>> GRPC Header of the control channel seems to be a good place to add
>>>>>>> upper bound information.
>>>>>>> Added jiras:
>>>>>>> https://issues.apache.org/jira/browse/BEAM-5166
>>>>>>> https://issues.apache.org/jira/browse/BEAM-5167
>>>>>>>
>>>>>>> On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Regarding resources: the runner can currently dictate the
>>>>>>>> mem/cpu/disk resources that the harness is allowed to use via the
>>>>>>>> provisioning api. The SDK harness need not -- and should not -- 
>>>>>>>> speculate
>>>>>>>> on what else might be running on the machine:
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/0e14965707b5d48a3de7fa69f09d88ef0aa48c09/model/fn-execution/src/main/proto/beam_provision_api.proto#L69
>>>>>>>>
>>>>>>>> A realistic startup-time computation in the SDK harness would be
>>>>>>>> something simple like: max(1, min(cpu*100, mem_mb/10)) say, and use 
>>>>>>>> that at
>>>>>>>> most number of threads. Or just hardcode to 300. Or a user-provided 
>>>>>>>> value.
>>>>>>>> Whatever the value is the maximum number of bundles in flight allowed 
>>>>>>>> at
>>>>>>>> any given time and needs to be communicated to the runner via some 
>>>>>>>> message.
>>>>>>>> Anything beyond would be rejected (but this shouldn't happen, because 
>>>>>>>> the
>>>>>>>> runner should respect that number).
>>>>>>>>
>>>>>>>> A dynamic computation would use the same limits from the SDK, but
>>>>>>>> take into account its own resource usage (incl. the usage by running
>>>>>>>> bundles).
>>>>>>>>
>>>>>>>> On Fri, Aug 17, 2018 at 6:20 PM Ankur Goenka <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I am thinking upper bound to be more on the lines of theocratical
>>>>>>>>> upper limit or any other static high value beyond which the SDK will 
>>>>>>>>> reject
>>>>>>>>> bundle verbosely. The idea is that SDK will not keep bundles in queue 
>>>>>>>>> while
>>>>>>>>> waiting on current bundles to finish. It will simply reject any 
>>>>>>>>> additional
>>>>>>>>> bundle.
>>>>>>>>> Beyond this I don't have a good answer to dynamic upper bound. As
>>>>>>>>> SDK does not have the complete picture of processes on the machine 
>>>>>>>>> with
>>>>>>>>> which it share resources, resources might not be a good proxy for 
>>>>>>>>> upper
>>>>>>>>> bound from the SDK point of view.
>>>>>>>>>
>>>>>>>>> On Fri, Aug 17, 2018 at 6:01 PM Lukasz Cwik <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Ankur, how would you expect an SDK to compute a realistic upper
>>>>>>>>>> bound (upfront or during pipeline computation)?
>>>>>>>>>>
>>>>>>>>>> First thought that came to my mind was that the SDK would provide
>>>>>>>>>> CPU/memory/... resourcing information and the runner making a 
>>>>>>>>>> judgement
>>>>>>>>>> call as to whether it should ask the SDK to do more work or less but 
>>>>>>>>>> its
>>>>>>>>>> not an explicit don't do more then X bundles in parallel.
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Makes sense. Having exposed upper bound on concurrency with
>>>>>>>>>>> optimum concurrency can give a good balance. This is good 
>>>>>>>>>>> information to
>>>>>>>>>>> expose while keeping the requirements from the SDK simple. SDK can 
>>>>>>>>>>> publish
>>>>>>>>>>> 1 as the optimum concurrency and upper bound to keep things simple.
>>>>>>>>>>>
>>>>>>>>>>> Runner introspection of upper bound on concurrency is important
>>>>>>>>>>> for correctness while introspection of optimum concurrency is 
>>>>>>>>>>> important for
>>>>>>>>>>> efficiency. This separates efficiency and correctness requirements.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I agree with Luke's observation, with the caveat that "infinite
>>>>>>>>>>>> amount of bundles in parallel" is limited by the available 
>>>>>>>>>>>> resources. For
>>>>>>>>>>>> example, the Go SDK harness will accept an arbitrary amount of 
>>>>>>>>>>>> parallel
>>>>>>>>>>>> work, but too much work will cause either excessive GC pressure 
>>>>>>>>>>>> with
>>>>>>>>>>>> crippling slowness or an outright OOM. Unless it's always 1, a 
>>>>>>>>>>>> reasonable
>>>>>>>>>>>> upper bound will either have to be provided by the user or 
>>>>>>>>>>>> computed from
>>>>>>>>>>>> the mem/cpu resources given. Of course, as some bundles takes more
>>>>>>>>>>>> resources than others, so any static value will be an estimate or 
>>>>>>>>>>>> ignore
>>>>>>>>>>>> resource limits.
>>>>>>>>>>>>
>>>>>>>>>>>> That said, I do not like that an "efficiency" aspect becomes a
>>>>>>>>>>>> subtle requirement for correctness due to Flink internals. I fear 
>>>>>>>>>>>> that road
>>>>>>>>>>>> leads to trouble.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> The later case of having a of supporting single bundle
>>>>>>>>>>>>> execution at a time on SDK and runner not using this flag is 
>>>>>>>>>>>>> exactly the
>>>>>>>>>>>>> reason we got into the Dead Lock here.
>>>>>>>>>>>>> I agree with exposing SDK optimum concurrency level ( 1 in
>>>>>>>>>>>>> later case ) and let runner decide to use it or not. But at the 
>>>>>>>>>>>>> same time
>>>>>>>>>>>>> expect SDK to handle infinite amount of bundles even if its not 
>>>>>>>>>>>>> efficient.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Ankur
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I believe in practice SDK harnesses will fall into one of two
>>>>>>>>>>>>>> capabilities, can process effectively an infinite amount of 
>>>>>>>>>>>>>> bundles in
>>>>>>>>>>>>>> parallel or can only process a single bundle at a time.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I believe it is more difficult for a runner to handle the
>>>>>>>>>>>>>> latter case well and to perform all the environment management 
>>>>>>>>>>>>>> that would
>>>>>>>>>>>>>> make that efficient. It may be inefficient for an SDK but I do 
>>>>>>>>>>>>>> believe it
>>>>>>>>>>>>>> should be able to say that I'm not great at anything more then a 
>>>>>>>>>>>>>> single
>>>>>>>>>>>>>> bundle at a time but utilizing this information by a runner 
>>>>>>>>>>>>>> should be
>>>>>>>>>>>>>> optional.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> To recap the discussion it seems that we have come-up with
>>>>>>>>>>>>>>> following point.
>>>>>>>>>>>>>>> SDKHarness Management and initialization.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    1. Runner completely own the work assignment to
>>>>>>>>>>>>>>>    SDKHarness.
>>>>>>>>>>>>>>>    2. Runner should know the capabilities and capacity of
>>>>>>>>>>>>>>>    SDKHarness and should assign work accordingly.
>>>>>>>>>>>>>>>    3. Spinning up of SDKHarness is runner's responsibility
>>>>>>>>>>>>>>>    and it can be done statically (a fixed pre configured number 
>>>>>>>>>>>>>>> of SDKHarness)
>>>>>>>>>>>>>>>    or dynamically or based on certain other configuration/logic 
>>>>>>>>>>>>>>> which runner
>>>>>>>>>>>>>>>    choose.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> SDKHarness Expectation. This is more in question and we
>>>>>>>>>>>>>>> should outline the responsibility of SDKHarness.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    1. SDKHarness should publish how many concurrent tasks
>>>>>>>>>>>>>>>    it can execute.
>>>>>>>>>>>>>>>    2. SDKHarness should start executing all the tasks items
>>>>>>>>>>>>>>>    assigned in parallel in a timely manner or fail task.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Also to add to simplification side. I think for better
>>>>>>>>>>>>>>> adoption, we should have simple SDKHarness as well as simple 
>>>>>>>>>>>>>>> Runner
>>>>>>>>>>>>>>> integration to encourage integration with more runner. Also 
>>>>>>>>>>>>>>> many runners
>>>>>>>>>>>>>>> might not expose some of the internal scheduling 
>>>>>>>>>>>>>>> characteristics so we
>>>>>>>>>>>>>>> should not expect scheduling characteristics for runner 
>>>>>>>>>>>>>>> integration.
>>>>>>>>>>>>>>> Moreover scheduling characteristics can change based on 
>>>>>>>>>>>>>>> pipeline type,
>>>>>>>>>>>>>>> infrastructure, available resource etc. So I am a bit hesitant 
>>>>>>>>>>>>>>> to add
>>>>>>>>>>>>>>> runner scheduling specifics for runner integration.
>>>>>>>>>>>>>>> A good balance between SDKHarness complexity and Runner
>>>>>>>>>>>>>>> integration can be helpful in easier adoption.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Ankur
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Finding a good balance is indeed the art of portability,
>>>>>>>>>>>>>>>> because the range of capability (and assumptions) on both 
>>>>>>>>>>>>>>>> sides is wide.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It was originally the idea to allow the SDK harness to be
>>>>>>>>>>>>>>>> an extremely simple bundle executer (specifically, 
>>>>>>>>>>>>>>>> single-threaded
>>>>>>>>>>>>>>>> execution one instruction at a time) however inefficient -- a 
>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>> sophisticated SDK harness would support more features and be 
>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>> efficient. For the issue described here, it seems problematic 
>>>>>>>>>>>>>>>> to me to send
>>>>>>>>>>>>>>>> non-executable bundles to the SDK harness under the 
>>>>>>>>>>>>>>>> expectation that the
>>>>>>>>>>>>>>>> SDK harness will concurrently work its way deeply enough down 
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> instruction queue to unblock itself. That would be an 
>>>>>>>>>>>>>>>> extremely subtle
>>>>>>>>>>>>>>>> requirement for SDK authors and one practical question
>>>>>>>>>>>>>>>> becomes: what should an SDK do with a bundle instruction that 
>>>>>>>>>>>>>>>> it doesn't
>>>>>>>>>>>>>>>> have capacity to execute? If a runner needs to make such
>>>>>>>>>>>>>>>> assumptions, I think that information should probably rather 
>>>>>>>>>>>>>>>> be explicit
>>>>>>>>>>>>>>>> along the lines of proposal 1 -- i.e., some kind of 
>>>>>>>>>>>>>>>> negotiation between
>>>>>>>>>>>>>>>> resources allotted to the SDK harness (a preliminary variant 
>>>>>>>>>>>>>>>> are in the
>>>>>>>>>>>>>>>> provisioning api) and what the SDK harness in return can do 
>>>>>>>>>>>>>>>> (and a valid
>>>>>>>>>>>>>>>> answer might be: 1 bundle at a time irrespectively of 
>>>>>>>>>>>>>>>> resources given) or a
>>>>>>>>>>>>>>>> per-bundle special "overloaded" error response. For other 
>>>>>>>>>>>>>>>> aspects, such as
>>>>>>>>>>>>>>>> side input readiness, the runner handles that complexity and 
>>>>>>>>>>>>>>>> the overall
>>>>>>>>>>>>>>>> bias has generally been to move complexity to the runner side.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The SDK harness and initialization overhead is entirely
>>>>>>>>>>>>>>>> SDK, job type and even pipeline specific. A docker container 
>>>>>>>>>>>>>>>> is also just a
>>>>>>>>>>>>>>>> process, btw, and doesn't inherently carry much overhead. That 
>>>>>>>>>>>>>>>> said, on a
>>>>>>>>>>>>>>>> single host, a static docker configuration is generally a lot 
>>>>>>>>>>>>>>>> simpler to
>>>>>>>>>>>>>>>> work with.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Henning
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It is good to see this discussed!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I think there needs to be a good balance between the SDK
>>>>>>>>>>>>>>>>> harness capabilities/complexity and responsibilities. 
>>>>>>>>>>>>>>>>> Additionally the user
>>>>>>>>>>>>>>>>> will need to be able to adjust the runner behavior, since the 
>>>>>>>>>>>>>>>>> type of
>>>>>>>>>>>>>>>>> workload executed in the harness also is a factor. Elsewhere 
>>>>>>>>>>>>>>>>> we already
>>>>>>>>>>>>>>>>> discussed that the current assumption of a single SDK harness 
>>>>>>>>>>>>>>>>> instance per
>>>>>>>>>>>>>>>>> Flink task manager brings problems with it and that there 
>>>>>>>>>>>>>>>>> needs to be more
>>>>>>>>>>>>>>>>> than one way how the runner can spin up SDK harnesses.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> There was the concern that instantiation if multiple SDK
>>>>>>>>>>>>>>>>> harnesses per TM host is expensive (resource usage, 
>>>>>>>>>>>>>>>>> initialization time
>>>>>>>>>>>>>>>>> etc.). That may hold true for a specific scenario, such as 
>>>>>>>>>>>>>>>>> batch workloads
>>>>>>>>>>>>>>>>> and the use of Docker containers. But it may look totally 
>>>>>>>>>>>>>>>>> different for a
>>>>>>>>>>>>>>>>> streaming topology or when SDK harness is just a process on 
>>>>>>>>>>>>>>>>> the same host.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> SDK harnesses were always responsible for executing all
>>>>>>>>>>>>>>>>>> work given to it concurrently. Runners have been responsible 
>>>>>>>>>>>>>>>>>> for choosing
>>>>>>>>>>>>>>>>>> how much work to give to the SDK harness in such a way that 
>>>>>>>>>>>>>>>>>> best utilizes
>>>>>>>>>>>>>>>>>> the SDK harness.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I understand that multithreading in python is inefficient
>>>>>>>>>>>>>>>>>> due to the global interpreter lock, it would be upto the 
>>>>>>>>>>>>>>>>>> runner in this
>>>>>>>>>>>>>>>>>> case to make sure that the amount of work it gives to each 
>>>>>>>>>>>>>>>>>> SDK harness best
>>>>>>>>>>>>>>>>>> utilizes it while spinning up an appropriate number of SDK 
>>>>>>>>>>>>>>>>>> harnesses.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Ankur,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for looking into this problem. The cause seems to
>>>>>>>>>>>>>>>>>>> be Flink's
>>>>>>>>>>>>>>>>>>> pipelined execution mode. It runs multiple tasks in one
>>>>>>>>>>>>>>>>>>> task slot and
>>>>>>>>>>>>>>>>>>> produces a deadlock when the pipelined operators
>>>>>>>>>>>>>>>>>>> schedule the SDK
>>>>>>>>>>>>>>>>>>> harness DoFns in non-topological order.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The problem would be resolved if we scheduled the tasks
>>>>>>>>>>>>>>>>>>> in topological
>>>>>>>>>>>>>>>>>>> order. Doing that is not easy because they run in
>>>>>>>>>>>>>>>>>>> separate Flink
>>>>>>>>>>>>>>>>>>> operators and the SDK Harness would have to have insight
>>>>>>>>>>>>>>>>>>> into the
>>>>>>>>>>>>>>>>>>> execution graph (which is not desirable).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The easiest method, which you proposed in 1) is to
>>>>>>>>>>>>>>>>>>> ensure that the
>>>>>>>>>>>>>>>>>>> number of threads in the SDK harness matches the number
>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> ExecutableStage DoFn operators.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The approach in 2) is what Flink does as well. It glues
>>>>>>>>>>>>>>>>>>> together
>>>>>>>>>>>>>>>>>>> horizontal parts of the execution graph, also in
>>>>>>>>>>>>>>>>>>> multiple threads. So I
>>>>>>>>>>>>>>>>>>> agree with your proposed solution.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>> Max
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote:
>>>>>>>>>>>>>>>>>>> > Hi,
>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>> > tl;dr Dead Lock in task execution caused by limited
>>>>>>>>>>>>>>>>>>> task parallelism on
>>>>>>>>>>>>>>>>>>> > SDKHarness.
>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>> > *Setup:*
>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>> >   * Job type: /*Beam Portable Python Batch*/ Job on
>>>>>>>>>>>>>>>>>>> Flink standalone
>>>>>>>>>>>>>>>>>>> >     cluster.
>>>>>>>>>>>>>>>>>>> >   * Only a single job is scheduled on the cluster.
>>>>>>>>>>>>>>>>>>> >   * Everything is running on a single machine with
>>>>>>>>>>>>>>>>>>> single Flink task
>>>>>>>>>>>>>>>>>>> >     manager.
>>>>>>>>>>>>>>>>>>> >   * Flink Task Manager Slots is 1.
>>>>>>>>>>>>>>>>>>> >   * Flink Parallelism is 1.
>>>>>>>>>>>>>>>>>>> >   * Python SDKHarness has 1 thread.
>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>> > *Example pipeline:*
>>>>>>>>>>>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink
>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>> > *Issue:*
>>>>>>>>>>>>>>>>>>> > With multi stage job, Flink schedule different
>>>>>>>>>>>>>>>>>>> dependent sub tasks
>>>>>>>>>>>>>>>>>>> > concurrently on Flink worker as long as it can get
>>>>>>>>>>>>>>>>>>> slots. Each map tasks
>>>>>>>>>>>>>>>>>>> > are then executed on SDKHarness.
>>>>>>>>>>>>>>>>>>> > Its possible that MapB gets to SDKHarness before MapA
>>>>>>>>>>>>>>>>>>> and hence gets
>>>>>>>>>>>>>>>>>>> > into the execution queue before MapA. Because we only
>>>>>>>>>>>>>>>>>>> have 1 execution
>>>>>>>>>>>>>>>>>>> > thread on SDKHarness, MapA will never get a chance to
>>>>>>>>>>>>>>>>>>> execute as MapB
>>>>>>>>>>>>>>>>>>> > will never release the execution thread. MapB will
>>>>>>>>>>>>>>>>>>> wait for input from
>>>>>>>>>>>>>>>>>>> > MapA. This gets us to a dead lock in a simple pipeline.
>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>> > *Mitigation:*
>>>>>>>>>>>>>>>>>>> > Set worker_count in pipeline options more than the
>>>>>>>>>>>>>>>>>>> expected sub tasks
>>>>>>>>>>>>>>>>>>> > in pipeline.
>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>> > *Proposal:*
>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>> >  1. We can get the maximum concurrency from the runner
>>>>>>>>>>>>>>>>>>> and make sure
>>>>>>>>>>>>>>>>>>> >     that we have more threads than max concurrency.
>>>>>>>>>>>>>>>>>>> This approach
>>>>>>>>>>>>>>>>>>> >     assumes that Beam has insight into runner
>>>>>>>>>>>>>>>>>>> execution plan and can
>>>>>>>>>>>>>>>>>>> >     make decision based on it.
>>>>>>>>>>>>>>>>>>> >  2. We dynamically create thread and cache them with a
>>>>>>>>>>>>>>>>>>> high upper bound
>>>>>>>>>>>>>>>>>>> >     in SDKHarness. We can warn if we are hitting the
>>>>>>>>>>>>>>>>>>> upper bound of
>>>>>>>>>>>>>>>>>>> >     threads. This approach assumes that runner does a
>>>>>>>>>>>>>>>>>>> good job of
>>>>>>>>>>>>>>>>>>> >     scheduling and will distribute tasks more or less
>>>>>>>>>>>>>>>>>>> evenly.
>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>> > We expect good scheduling from runners so I prefer
>>>>>>>>>>>>>>>>>>> approach 2. It is
>>>>>>>>>>>>>>>>>>> > simpler to implement and the implementation is not
>>>>>>>>>>>>>>>>>>> runner specific. This
>>>>>>>>>>>>>>>>>>> > approach better utilize resource as it creates only as
>>>>>>>>>>>>>>>>>>> many threads as
>>>>>>>>>>>>>>>>>>> > needed instead of the peak thread requirement.
>>>>>>>>>>>>>>>>>>> > And last but not the least, it gives runner control
>>>>>>>>>>>>>>>>>>> over managing truly
>>>>>>>>>>>>>>>>>>> > active tasks.
>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>> > Please let me know if I am missing something and your
>>>>>>>>>>>>>>>>>>> thoughts on the
>>>>>>>>>>>>>>>>>>> > approach.
>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>> > Thanks,
>>>>>>>>>>>>>>>>>>> > Ankur
>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>

Reply via email to