Thanks Thomas. The id provided to the SDK harness must be sent as a gRPC header when it connects to the TM. The TM can use a fixed port and multiplex requests based on that id - to match the SDK harness with the appropriate job/slot/whatnot. The relationship between SDK harness and TM is not limited to 1:1, but rather many:1. We'll likely need that for cross-language as well. Wouldn't multiplexing on a single port for the control plane be the easiest solution for both #1 and #2? The data plane can still use various dynamically-allocated ports.
On Kubernetes, we're somewhat constrained by the pod lifetime and multi-job TMs might not be as natural to achieve. Thanks, Henning On Wed, Jun 6, 2018 at 2:28 PM Thomas Weise <t...@apache.org> wrote: > Hi Henning, > > Here is a page that explains the scheduling and overall functioning of the > task manager in Flink: > > > https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html > > Here are the 2 issues: > > #1 each task manager process get assigned multiple units of execution into > task slots. So when we deploy a Beam pipeline, we can end up with multiple > executable stages running in a single TM JVM. > > This where a 1-to-1 relationship between TM and SDK harness can lead to a > bottleneck (all task slots of a single TM push their work to a single SDK > container). > > #2 in a deployment where multiple pipelines share a Flink cluster, the SDK > harness per TM approach wouldn't work logically. We would need to have > multiple SDK containers, not just for efficiency reasons. > > This would not be an issue for the deployment scenario I'm looking at, but > it needs to be considered for general Flink runner deployment. > > Regarding the assignment of fixed endpoints within the TM, that is > possible but it doesn't address #1 and #2. > > I hope this clarifies? > > Thanks, > Thomas > > > On Wed, Jun 6, 2018 at 12:31 PM, Henning Rohde <hero...@google.com> wrote: > >> Thanks for writing down and explaining the problem, Thomas. Let me try to >> tease some of the topics apart. >> >> First, the basic setup is currently as follows: there are 2 worker >> processes (A) "SDK harness" and (B) "Runner harness" that needs to >> communicate. A connects to B. The fundamental endpoint(s) of B as well as >> an id -- logging, provisioning, artifacts and control -- are provided to A >> via command line parameters. A is not expected to be able to connect to the >> control port without first obtaining pipeline options (from provisioning) >> and staged files (from artifacts). As an side, this is where the separate >> boot.go code comes in handy. A can assume it will be restarted, if it >> exits. A does not assume the given endpoints are up when started and should >> make blocking calls with timeout (but if not and exits, it is restarted >> anyway and will retry). Note that the data plane endpoints are part of the >> control instructions and need not be known or allocated at startup or even >> be served by the same TM. >> >> Second, whether or not docker is used is rather an implementation detail, >> but if we use Kubernetes (or other such options) then some constraints come >> into play. >> >> Either way, two scenarios work well: >> (1) B starts A: The ULR and Flink prototype does this. B will delay >> starting A until it has decided which endpoints to use. This approach >> requires B to do process/container management, which we'd rather not have >> to do at scale. But it's convenient for local runners. >> (2) B has its (local) endpoints configured or fixed: A and B can be >> started concurrently. Dataflow does this. Kubernetes lends itself well to >> this approach (and handles container management for us). >> >> The Flink on Kubernetes scenario described above doesn't: >> (3) B must use randomized (local) endpoints _and_ A and B are started >> concurrently: A would not know where to connect. >> >> Perhaps I'm not understanding the constraints of the TM well enough, but >> can we really not open a configured/fixed port from the TM -- especially in >> a network-isolated Kubernetes pod? Adding a third process (C) "proxy" to >> the pod might by an alternative option and morph (3) into (2). B would >> configure C when it's ready. A would connect to C, but be blocked until B >> has configured it. C could perhaps even serve logging, provisioning, and >> artifacts without B. And the data plane would not go over C anyway. If >> control proxy'ing is a concern, then alternatively we would add an >> indirection to the container contract and provide the control endpoint in >> the provisioning api, say, or even a new discovery service. >> >> There are of course other options and tradeoffs, but having Flink work on >> Kubernetes and not go against the grain seems desirable to me. >> >> Thanks, >> Henning >> >> >> On Wed, Jun 6, 2018 at 10:11 AM Thomas Weise <t...@apache.org> wrote: >> >>> Hi, >>> >>> The current plan for running the SDK harness is to execute docker to >>> launch SDK containers with service endpoints provided by the runner in the >>> docker command line. >>> >>> In the case of Flink runner (prototype), the service endpoints are >>> dynamically allocated per executable stage. There is typically one Flink >>> task manager running per machine. Each TM has multiple task slots. A subset >>> of these task slots will run the Beam executable stages. Flink allows >>> multiple jobs in one TM, so we could have executable stages of different >>> pipelines running in a single TM, depending on how users deploy. The >>> prototype also has no cleanup for the SDK containers, they remain running >>> and orphaned once the runner is gone. >>> >>> I'm trying to find out how this approach can be augmented for deployment >>> on Kubernetes. Our deployments won't allow multiple jobs per task manager, >>> so all task slots will belong to the same pipeline context. The intent is >>> to deploy SDK harness containers along with TMs in the same pod. No >>> assumption can be made about the order in which the containers are started, >>> and the SDK container wouldn't know the connect address at startup (it can >>> only be discovered after the pipeline gets deployed into the TMs). >>> >>> I talked about that a while ago with Henning and one idea was to set a >>> fixed endpoint address so that the boot code in the SDK container knows >>> upfront where to connect to, even when that endpoint isn't available yet. >>> This approach may work with minimal changes to runner and little or no >>> change to SDK container (as long as the SDK is prepared to retry). The >>> downside is that all (parallel) task slots of the TM will use the same SDK >>> worker, which will likely lead to performance issues, at least with the >>> Python SDK that we are planning to use. >>> >>> An alternative may be to define an SDK worker pool per pod, with a >>> discovery mechanism for workers to find the runner endpoints and a >>> coordination mechanism that distributes the dynamically allocated endpoints >>> that are provided by the executable stage task slots over the available >>> workers. >>> >>> Any thoughts on this? Is anyone else looking at a docker free deployment? >>> >>> Thanks, >>> Thomas >>> >>> >