Thanks for writing down and explaining the problem, Thomas. Let me try to
tease some of the topics apart.

First, the basic setup is currently as follows: there are 2 worker
processes (A) "SDK harness" and (B) "Runner harness" that needs to
communicate. A connects to B. The fundamental endpoint(s) of B as well as
an id -- logging, provisioning, artifacts and control -- are provided to A
via command line parameters. A is not expected to be able to connect to the
control port without first obtaining pipeline options (from provisioning)
and staged files (from artifacts). As an side, this is where the separate
boot.go code comes in handy. A can assume it will be restarted, if it
exits. A does not assume the given endpoints are up when started and should
make blocking calls with timeout (but if not and exits, it is restarted
anyway and will retry). Note that the data plane endpoints are part of the
control instructions and need not be known or allocated at startup or even
be served by the same TM.

Second, whether or not docker is used is rather an implementation detail,
but if we use Kubernetes (or other such options) then some constraints come
into play.

Either way, two scenarios work well:
   (1) B starts A: The ULR and Flink prototype does this. B will delay
starting A until it has decided which endpoints to use. This approach
requires B to do process/container management, which we'd rather not have
to do at scale. But it's convenient for local runners.
   (2) B has its (local) endpoints configured or fixed: A and B can be
started concurrently. Dataflow does this. Kubernetes lends itself well to
this approach (and handles container management for us).

The Flink on Kubernetes scenario described above doesn't:
   (3) B must use randomized (local) endpoints _and_ A and B are started
concurrently: A would not know where to connect.

Perhaps I'm not understanding the constraints of the TM well enough, but
can we really not open a configured/fixed port from the TM -- especially in
a network-isolated Kubernetes pod? Adding a third process (C) "proxy" to
the pod might by an alternative option and morph (3) into (2). B would
configure C when it's ready. A would connect to C, but be blocked until B
has configured it. C could perhaps even serve logging, provisioning, and
artifacts without B. And the data plane would not go over C anyway. If
control proxy'ing is a concern, then alternatively we would add an
indirection to the container contract and provide the control endpoint in
the provisioning api, say, or even a new discovery service.

There are of course other options and tradeoffs, but having Flink work on
Kubernetes and not go against the grain seems desirable to me.

Thanks,
 Henning


On Wed, Jun 6, 2018 at 10:11 AM Thomas Weise <t...@apache.org> wrote:

> Hi,
>
> The current plan for running the SDK harness is to execute docker to
> launch SDK containers with service endpoints provided by the runner in the
> docker command line.
>
> In the case of Flink runner (prototype), the service endpoints are
> dynamically allocated per executable stage. There is typically one Flink
> task manager running per machine. Each TM has multiple task slots. A subset
> of these task slots will run the Beam executable stages. Flink allows
> multiple jobs in one TM, so we could have executable stages of different
> pipelines running in a single TM, depending on how users deploy. The
> prototype also has no cleanup for the SDK containers, they remain running
> and orphaned once the runner is gone.
>
> I'm trying to find out how this approach can be augmented for deployment
> on Kubernetes. Our deployments won't allow multiple jobs per task manager,
> so all task slots will belong to the same pipeline context. The intent is
> to deploy SDK harness containers along with TMs in the same pod. No
> assumption can be made about the order in which the containers are started,
> and the SDK container wouldn't know the connect address at startup (it can
> only be discovered after the pipeline gets deployed into the TMs).
>
> I talked about that a while ago with Henning and one idea was to set a
> fixed endpoint address so that the boot code in the SDK container knows
> upfront where to connect to, even when that endpoint isn't available yet.
> This approach may work with minimal changes to runner and little or no
> change to SDK container (as long as the SDK is prepared to retry). The
> downside is that all (parallel) task slots of the TM will use the same SDK
> worker, which will likely lead to performance issues, at least with the
> Python SDK that we are planning to use.
>
> An alternative may be to define an SDK worker pool per pod, with a
> discovery mechanism for workers to find the runner endpoints and a
> coordination mechanism that distributes the dynamically allocated endpoints
> that are provided by the executable stage task slots over the available
> workers.
>
> Any thoughts on this? Is anyone else looking at a docker free deployment?
>
> Thanks,
> Thomas
>
>

Reply via email to