I do agree that it usually takes longer for runner before tries to connect than external worker to become available, I suppose that is probably why we have the external service pool in the current way.
However, I am not 100% confident to say it won’t happen in practice because the design does seem to have the potential to result in such failure. The question is, do we want to rely on the assumption that worker pool usually starts faster than runner connects to it or we should add some preventative approaches to try to mitigate it ever happens? The answer may be OK to depend on the assumption, in addition, if there are some data points showing the possibility that worker is ready first, that will give us more confidence. What are your thoughts? Best, Ke > On Jun 2, 2021, at 12:37 PM, Kyle Weaver <[email protected]> wrote: > > As far as I'm aware there's nothing strictly guaranteeing the worker pool has > been started. But in practice it takes a while for the job to start up - the > pipeline needs to be constructed, sent to the job server, translated, and > then the runner needs to start the job, etc. before the external environment > request is sent. So I'm not sure whether or not the problem you describe > would ever actually happen. > > On Wed, Jun 2, 2021 at 11:01 AM Ke Wu <[email protected] > <mailto:[email protected]>> wrote: > Hi Kyle, > > Thanks for reviewing https://github.com/apache/beam/pull/14923 > <https://github.com/apache/beam/pull/14923>. I would like to follow up with > the deadline & waitForReady on ExternalEnvironment here. > > In Kubernetes, if my understanding is correct, there is no ordering support > when bringing up containers (init container does not work for external mode > because it requires init container to complete). Therefore, if we bring up > the external worker pool container together with the runner container, which > is one the supported approach by Flink Runner on K8s [1], then it is possible > that external worker pool service may not be available when runner tries to > connect. > > What are the recommended way to guarantee the availability of external worker > pool service before connecting? We would like to adopt the pattern to support > Samza runner on K8s as well. > > Best, > Ke > > > [1] [Public] Beam Flink K8s: > https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898 > > <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898> > > >> On May 27, 2021, at 6:11 PM, Ke Wu <[email protected] >> <mailto:[email protected]>> wrote: >> >> Good to know. We are working on running java portable pipeline for Samza >> runner and I believe we could take on the task to enhance the java workflow >> to support timeout/retry etc on gRPC calls. >> >> Created BEAM-12419 <https://issues.apache.org/jira/browse/BEAM-12419> to >> track the work. >> >> Best, >> Ke >> >>> On May 27, 2021, at 4:30 PM, Kyle Weaver <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> I don't think there's any specific reason we don't set a timeout, I'm >>> guessing it was just never worth the effort of implementing. If it's stuck >>> it should be pretty obvious from the logs: "Still waiting for startup of >>> environment from {} for worker id {}" >>> >>> On Thu, May 27, 2021 at 4:04 PM Ke Wu <[email protected] >>> <mailto:[email protected]>> wrote: >>> Hi Kyle, >>> >>> Thank you for the prompt response and apologize for the late reply. >>> >>> [1] seems to be only available in python portable_runner but not java >>> PortableRunner, is it intended or we could add similar changes in java as >>> well? >>> >>> [2] makes sense to block since the wait/retry is handled in the previous >>> prepare(), however, is there any specific reason why we do not want to >>> support timeout in start worker request? >>> >>> Best, >>> Ke >>> >>>> On May 14, 2021, at 11:25 AM, Kyle Weaver <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> >>>> 1. and 2. are both facilitated by GRPC, which takes care of most of the >>>> retry/wait logic. In some places we have a configurable timeout (which >>>> defaults to 60s) [1], while in other places we block [2][3]. >>>> >>>> [1] https://issues.apache.org/jira/browse/BEAM-7933 >>>> <https://issues.apache.org/jira/browse/BEAM-7933> >>>> [2] >>>> https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242 >>>> >>>> <https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242> >>>> [3] >>>> https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115 >>>> >>>> <https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115> >>>> On Fri, May 14, 2021 at 10:51 AM Ke Wu <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> Hello All, >>>> >>>> I came across this question when I am reading Beam on Flink on Kubernetes >>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.x9qy4wlfgc1g> >>>> and flink-on-k8s-operator >>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/tree/0310df76d6e2128cd5d2bc51fae4e842d370c463> >>>> and realized that there seems no retry/wait logic built in PortableRunner >>>> nor ExternalEnvironmentFactory, (correct me if I am wrong) which creates >>>> implications that: >>>> >>>> 1. Job Server needs to be ready to accept request before SDK Client could >>>> submit request. >>>> 2. External Worker Pool Service needs to be ready to accept start/stop >>>> worker request before runner starts to request. >>>> >>>> This may bring some challenges on k8s since Flink opt to use multi >>>> containers pattern when bringing up a beam portable pipeline, in addition, >>>> I don’t find any special lifecycle management in place to guarantee the >>>> order, e.g. External Worker Pool Service container to start and ready >>>> before the task manager container to start making requests. >>>> >>>> I am wondering if I missed anything to guarantee the readiness of the >>>> dependent service or we are relying on that dependent containers are much >>>> lighter weigh so it should, in most time, be ready before the other >>>> container start to make requests. >>>> >>>> Best, >>>> Ke >>>> >>> >> >
