Thanks Thomas. The id provided to the SDK harness must be sent as a gRPC
header when it connects to the TM. The TM can use a fixed port and
multiplex requests based on that id - to match the SDK harness with the
appropriate job/slot/whatnot. The relationship between SDK harness and TM
is not limited to 1:1, but rather many:1. We'll likely need that for
cross-language as well. Wouldn't multiplexing on a single port for the
control plane be the easiest solution for both #1 and #2? The data plane
can still use various dynamically-allocated ports.

On Kubernetes, we're somewhat constrained by the pod lifetime and multi-job
TMs might not be as natural to achieve.

Thanks,
 Henning

On Wed, Jun 6, 2018 at 2:28 PM Thomas Weise <t...@apache.org> wrote:

> Hi Henning,
>
> Here is a page that explains the scheduling and overall functioning of the
> task manager in Flink:
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html
>
> Here are the 2 issues:
>
> #1 each task manager process get assigned multiple units of execution into
> task slots. So when we deploy a Beam pipeline, we can end up with multiple
> executable stages running in a single TM JVM.
>
> This where a 1-to-1 relationship between TM and SDK harness can lead to a
> bottleneck (all task slots of a single TM push their work to a single SDK
> container).
>
> #2 in a deployment where multiple pipelines share a Flink cluster, the SDK
> harness per TM approach wouldn't work logically. We would need to have
> multiple SDK containers, not just for efficiency reasons.
>
> This would not be an issue for the deployment scenario I'm looking at, but
> it needs to be considered for general Flink runner deployment.
>
> Regarding the assignment of fixed endpoints within the TM, that is
> possible but it doesn't address #1 and #2.
>
> I hope this clarifies?
>
> Thanks,
> Thomas
>
>
> On Wed, Jun 6, 2018 at 12:31 PM, Henning Rohde <hero...@google.com> wrote:
>
>> Thanks for writing down and explaining the problem, Thomas. Let me try to
>> tease some of the topics apart.
>>
>> First, the basic setup is currently as follows: there are 2 worker
>> processes (A) "SDK harness" and (B) "Runner harness" that needs to
>> communicate. A connects to B. The fundamental endpoint(s) of B as well as
>> an id -- logging, provisioning, artifacts and control -- are provided to A
>> via command line parameters. A is not expected to be able to connect to the
>> control port without first obtaining pipeline options (from provisioning)
>> and staged files (from artifacts). As an side, this is where the separate
>> boot.go code comes in handy. A can assume it will be restarted, if it
>> exits. A does not assume the given endpoints are up when started and should
>> make blocking calls with timeout (but if not and exits, it is restarted
>> anyway and will retry). Note that the data plane endpoints are part of the
>> control instructions and need not be known or allocated at startup or even
>> be served by the same TM.
>>
>> Second, whether or not docker is used is rather an implementation detail,
>> but if we use Kubernetes (or other such options) then some constraints come
>> into play.
>>
>> Either way, two scenarios work well:
>>    (1) B starts A: The ULR and Flink prototype does this. B will delay
>> starting A until it has decided which endpoints to use. This approach
>> requires B to do process/container management, which we'd rather not have
>> to do at scale. But it's convenient for local runners.
>>    (2) B has its (local) endpoints configured or fixed: A and B can be
>> started concurrently. Dataflow does this. Kubernetes lends itself well to
>> this approach (and handles container management for us).
>>
>> The Flink on Kubernetes scenario described above doesn't:
>>    (3) B must use randomized (local) endpoints _and_ A and B are started
>> concurrently: A would not know where to connect.
>>
>> Perhaps I'm not understanding the constraints of the TM well enough, but
>> can we really not open a configured/fixed port from the TM -- especially in
>> a network-isolated Kubernetes pod? Adding a third process (C) "proxy" to
>> the pod might by an alternative option and morph (3) into (2). B would
>> configure C when it's ready. A would connect to C, but be blocked until B
>> has configured it. C could perhaps even serve logging, provisioning, and
>> artifacts without B. And the data plane would not go over C anyway. If
>> control proxy'ing is a concern, then alternatively we would add an
>> indirection to the container contract and provide the control endpoint in
>> the provisioning api, say, or even a new discovery service.
>>
>> There are of course other options and tradeoffs, but having Flink work on
>> Kubernetes and not go against the grain seems desirable to me.
>>
>> Thanks,
>>  Henning
>>
>>
>> On Wed, Jun 6, 2018 at 10:11 AM Thomas Weise <t...@apache.org> wrote:
>>
>>> Hi,
>>>
>>> The current plan for running the SDK harness is to execute docker to
>>> launch SDK containers with service endpoints provided by the runner in the
>>> docker command line.
>>>
>>> In the case of Flink runner (prototype), the service endpoints are
>>> dynamically allocated per executable stage. There is typically one Flink
>>> task manager running per machine. Each TM has multiple task slots. A subset
>>> of these task slots will run the Beam executable stages. Flink allows
>>> multiple jobs in one TM, so we could have executable stages of different
>>> pipelines running in a single TM, depending on how users deploy. The
>>> prototype also has no cleanup for the SDK containers, they remain running
>>> and orphaned once the runner is gone.
>>>
>>> I'm trying to find out how this approach can be augmented for deployment
>>> on Kubernetes. Our deployments won't allow multiple jobs per task manager,
>>> so all task slots will belong to the same pipeline context. The intent is
>>> to deploy SDK harness containers along with TMs in the same pod. No
>>> assumption can be made about the order in which the containers are started,
>>> and the SDK container wouldn't know the connect address at startup (it can
>>> only be discovered after the pipeline gets deployed into the TMs).
>>>
>>> I talked about that a while ago with Henning and one idea was to set a
>>> fixed endpoint address so that the boot code in the SDK container knows
>>> upfront where to connect to, even when that endpoint isn't available yet.
>>> This approach may work with minimal changes to runner and little or no
>>> change to SDK container (as long as the SDK is prepared to retry). The
>>> downside is that all (parallel) task slots of the TM will use the same SDK
>>> worker, which will likely lead to performance issues, at least with the
>>> Python SDK that we are planning to use.
>>>
>>> An alternative may be to define an SDK worker pool per pod, with a
>>> discovery mechanism for workers to find the runner endpoints and a
>>> coordination mechanism that distributes the dynamically allocated endpoints
>>> that are provided by the executable stage task slots over the available
>>> workers.
>>>
>>> Any thoughts on this? Is anyone else looking at a docker free deployment?
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>

Reply via email to