Sounds good.

Regarding (2), the bootstrapping endpoints serve job specific info, but a
Flink task manager lifecycle isn't tied to a single job (it is a worker
that exists even before the pipeline was submitted). Therefore we will need
to fall back to (1). Boot code would fail to connect until the job was
deployed and the Flink runner has established the endpoint. This should
work fine, as long as the boot code is retried without causing the entire
container to exit, it may just be some noise in the logs?

In my scenario there won't be a second job that runs on the same task
manager, since we are planning to deploy Flink along with the application.
But Flink in general also supports a "session" mode where multiple jobs can
share the same set of task managers. In that case it would be necessary to
isolate the SDK workers because they can only serve a single job (unless
what you have listed under static information is identical).

Looking at the current runner code there will be some work in the
JobResourceManager/SingletonSdkHarnessManager neighborhood that I can pick
up once we have the basics working in master. Currently SDK workers can
only be distinguished by the port they connect to, the runner does not look
at the worker ID or makes it available in any way. So the support to
multiplex has to be added. Perhaps Ben/Alex can comment on this?

Thanks,
Thomas


On Fri, Jun 8, 2018 at 10:19 AM, Henning Rohde <hero...@google.com> wrote:

> You're right. That is the idea.
>
> Two comments on the executable stage not being available yet:
>   (1) An SDK harness may either retry or fail (exit) if it can't
> connect/times out/gets an error. If it exits, the runner/environment is
> responsible for restarting the process/container. So it will effectively
> always retry. The boot code currently used tries to connect for 2 mins
> after which it gives up (and in turn is restarted and tries again). The
> 2min is set a bit arbitrarily, btw, so we can adjust it for the default
> containers.
>   (2) The 2 bootstrapping endpoints serve static information (pipeline
> options, artifacts, and job metadata) that may not require an executable
> stage -- for example, if the artifact service just serves data from HDFS.
> The control endpoint is mainly driven by the runner side, so the
> multiplexer could allow any SDK harness to connect, but it just wouldn't
> send any actual instructions until the executable stages were ready. So for
> 2nd jobs or if we have some global hooks into the TM (or deploy a separate
> process -- provisioning and artifacts are separate services to make this
> possible), it might be possible to allow the SDK harness to boot in
> parallel with the TM being fully ready. Disclaimer: I have a limited
> understanding of the Flink constraints here.
>
> Thanks,
>  Henning
>
>
>
> On Fri, Jun 8, 2018 at 7:49 AM Thomas Weise <t...@apache.org> wrote:
>
>> Yes, it did not occur to me that we have the identifier available for
>> this. I just took a fresh look at https://s.apache.org/beam-
>> fn-api-container-contract
>>
>> So it should be possible to start a pool of containers with pre-assigned
>> IDs in the pod, communicate the same set of IDs to the runner (via it's
>> configuration) and then come up with some mechanism to assign executable
>> stages to worker IDs as part of the Flink operator initialization.
>>
>> By the time the SDK boot code calls the provisioning service to fetch the
>> pipeline options, the runner wouldn't be ready (either since the TM isn't
>> running or the executable stages were not deployed into it yet). So will
>> that call just retry until the endpoint becomes available? On the runner
>> side, the endpoint can only be activated (under the fixed address) when the
>> task slots are assigned.
>>
>> Thanks,
>> Thomas
>>
>>
>>
>>
>>
>> On Wed, Jun 6, 2018 at 3:19 PM, Henning Rohde <hero...@google.com> wrote:
>>
>>> Thanks Thomas. The id provided to the SDK harness must be sent as a gRPC
>>> header when it connects to the TM. The TM can use a fixed port and
>>> multiplex requests based on that id - to match the SDK harness with the
>>> appropriate job/slot/whatnot. The relationship between SDK harness and TM
>>> is not limited to 1:1, but rather many:1. We'll likely need that for
>>> cross-language as well. Wouldn't multiplexing on a single port for the
>>> control plane be the easiest solution for both #1 and #2? The data plane
>>> can still use various dynamically-allocated ports.
>>>
>>> On Kubernetes, we're somewhat constrained by the pod lifetime and
>>> multi-job TMs might not be as natural to achieve.
>>>
>>> Thanks,
>>>  Henning
>>>
>>> On Wed, Jun 6, 2018 at 2:28 PM Thomas Weise <t...@apache.org> wrote:
>>>
>>>> Hi Henning,
>>>>
>>>> Here is a page that explains the scheduling and overall functioning of
>>>> the task manager in Flink:
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-
>>>> release-1.5/internals/job_scheduling.html
>>>>
>>>> Here are the 2 issues:
>>>>
>>>> #1 each task manager process get assigned multiple units of execution
>>>> into task slots. So when we deploy a Beam pipeline, we can end up with
>>>> multiple executable stages running in a single TM JVM.
>>>>
>>>> This where a 1-to-1 relationship between TM and SDK harness can lead to
>>>> a bottleneck (all task slots of a single TM push their work to a single SDK
>>>> container).
>>>>
>>>> #2 in a deployment where multiple pipelines share a Flink cluster, the
>>>> SDK harness per TM approach wouldn't work logically. We would need to have
>>>> multiple SDK containers, not just for efficiency reasons.
>>>>
>>>> This would not be an issue for the deployment scenario I'm looking at,
>>>> but it needs to be considered for general Flink runner deployment.
>>>>
>>>> Regarding the assignment of fixed endpoints within the TM, that is
>>>> possible but it doesn't address #1 and #2.
>>>>
>>>> I hope this clarifies?
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>> On Wed, Jun 6, 2018 at 12:31 PM, Henning Rohde <hero...@google.com>
>>>> wrote:
>>>>
>>>>> Thanks for writing down and explaining the problem, Thomas. Let me try
>>>>> to tease some of the topics apart.
>>>>>
>>>>> First, the basic setup is currently as follows: there are 2 worker
>>>>> processes (A) "SDK harness" and (B) "Runner harness" that needs to
>>>>> communicate. A connects to B. The fundamental endpoint(s) of B as well as
>>>>> an id -- logging, provisioning, artifacts and control -- are provided to A
>>>>> via command line parameters. A is not expected to be able to connect to 
>>>>> the
>>>>> control port without first obtaining pipeline options (from provisioning)
>>>>> and staged files (from artifacts). As an side, this is where the separate
>>>>> boot.go code comes in handy. A can assume it will be restarted, if it
>>>>> exits. A does not assume the given endpoints are up when started and 
>>>>> should
>>>>> make blocking calls with timeout (but if not and exits, it is restarted
>>>>> anyway and will retry). Note that the data plane endpoints are part of the
>>>>> control instructions and need not be known or allocated at startup or even
>>>>> be served by the same TM.
>>>>>
>>>>> Second, whether or not docker is used is rather an implementation
>>>>> detail, but if we use Kubernetes (or other such options) then some
>>>>> constraints come into play.
>>>>>
>>>>> Either way, two scenarios work well:
>>>>>    (1) B starts A: The ULR and Flink prototype does this. B will delay
>>>>> starting A until it has decided which endpoints to use. This approach
>>>>> requires B to do process/container management, which we'd rather not have
>>>>> to do at scale. But it's convenient for local runners.
>>>>>    (2) B has its (local) endpoints configured or fixed: A and B can be
>>>>> started concurrently. Dataflow does this. Kubernetes lends itself well to
>>>>> this approach (and handles container management for us).
>>>>>
>>>>> The Flink on Kubernetes scenario described above doesn't:
>>>>>    (3) B must use randomized (local) endpoints _and_ A and B are
>>>>> started concurrently: A would not know where to connect.
>>>>>
>>>>> Perhaps I'm not understanding the constraints of the TM well enough,
>>>>> but can we really not open a configured/fixed port from the TM --
>>>>> especially in a network-isolated Kubernetes pod? Adding a third process 
>>>>> (C)
>>>>> "proxy" to the pod might by an alternative option and morph (3) into (2). 
>>>>> B
>>>>> would configure C when it's ready. A would connect to C, but be blocked
>>>>> until B has configured it. C could perhaps even serve logging,
>>>>> provisioning, and artifacts without B. And the data plane would not go 
>>>>> over
>>>>> C anyway. If control proxy'ing is a concern, then alternatively we would
>>>>> add an indirection to the container contract and provide the control
>>>>> endpoint in the provisioning api, say, or even a new discovery service.
>>>>>
>>>>> There are of course other options and tradeoffs, but having Flink work
>>>>> on Kubernetes and not go against the grain seems desirable to me.
>>>>>
>>>>> Thanks,
>>>>>  Henning
>>>>>
>>>>>
>>>>> On Wed, Jun 6, 2018 at 10:11 AM Thomas Weise <t...@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> The current plan for running the SDK harness is to execute docker to
>>>>>> launch SDK containers with service endpoints provided by the runner in 
>>>>>> the
>>>>>> docker command line.
>>>>>>
>>>>>> In the case of Flink runner (prototype), the service endpoints are
>>>>>> dynamically allocated per executable stage. There is typically one Flink
>>>>>> task manager running per machine. Each TM has multiple task slots. A 
>>>>>> subset
>>>>>> of these task slots will run the Beam executable stages. Flink allows
>>>>>> multiple jobs in one TM, so we could have executable stages of different
>>>>>> pipelines running in a single TM, depending on how users deploy. The
>>>>>> prototype also has no cleanup for the SDK containers, they remain running
>>>>>> and orphaned once the runner is gone.
>>>>>>
>>>>>> I'm trying to find out how this approach can be augmented for
>>>>>> deployment on Kubernetes. Our deployments won't allow multiple jobs per
>>>>>> task manager, so all task slots will belong to the same pipeline context.
>>>>>> The intent is to deploy SDK harness containers along with TMs in the same
>>>>>> pod. No assumption can be made about the order in which the containers 
>>>>>> are
>>>>>> started, and the SDK container wouldn't know the connect address at 
>>>>>> startup
>>>>>> (it can only be discovered after the pipeline gets deployed into the 
>>>>>> TMs).
>>>>>>
>>>>>> I talked about that a while ago with Henning and one idea was to set
>>>>>> a fixed endpoint address so that the boot code in the SDK container knows
>>>>>> upfront where to connect to, even when that endpoint isn't available yet.
>>>>>> This approach may work with minimal changes to runner and little or no
>>>>>> change to SDK container (as long as the SDK is prepared to retry). The
>>>>>> downside is that all (parallel) task slots of the TM will use the same 
>>>>>> SDK
>>>>>> worker, which will likely lead to performance issues, at least with the
>>>>>> Python SDK that we are planning to use.
>>>>>>
>>>>>> An alternative may be to define an SDK worker pool per pod, with a
>>>>>> discovery mechanism for workers to find the runner endpoints and a
>>>>>> coordination mechanism that distributes the dynamically allocated 
>>>>>> endpoints
>>>>>> that are provided by the executable stage task slots over the available
>>>>>> workers.
>>>>>>
>>>>>> Any thoughts on this? Is anyone else looking at a docker free
>>>>>> deployment?
>>>>>>
>>>>>> Thanks,
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>
>>

Reply via email to