hm, how about this? Start/stop the server in @Setup/@Teardown (probably with ref counting since you can have multiple instances on one host) and then use the health checks on the load balancer to prevent routing to instances w/o the SDF running?
Curious how you'll get around the firewall rules though, I've wanted to do something similar in the past and that was the biggest issue :) On Thu, Oct 7, 2021 at 1:50 PM Daniel Collins <dpcoll...@google.com> wrote: > > Yes a JvmInitializer would work great as well. > > To be clear, I don't think there's an issue with JvmInitializer instead of > a java static. However, it doesn't seem to solve the problem, which is > ensuring messages received by the server are not stranded on a machine that > will never accept them for processing (i.e. will never run the SDF > implementation which pulls them out of the buffer). > > I actually am not super concerned about splitting, since there is no > concept of progress through the restriction of "all messages the user may > send in the future", you can just divide the restriction infinitely without > issue (i.e. always allow a split), as long as there is some way to ensure > the runner will run the SDF on every JVM it is loaded on. > > On Thu, Oct 7, 2021 at 1:38 PM Luke Cwik <lc...@google.com> wrote: > >> Yes a JvmInitializer would work great as well. >> >> The difference with an SDF would be that you could use other pipeline >> constructs before the SDF to add more http listeners with different >> configurations. Also, once there is a runner that supports dynamic >> splitting you would be able to scale up and down based upon incoming >> requests and might be able to reduce the burden/load on the load balancers >> themselves. >> >> On Thu, Oct 7, 2021 at 10:16 AM Steve Niemitz <sniem...@apache.org> >> wrote: >> >>> unrelated to the actual question, but iirc dataflow workers have >>> iptables rules that drop all inbound traffic (other than a few exceptions). >>> >>> In any case, do you actually need the server part to be "inside" the >>> pipeline? Could you just use a JvmInitalizer to launch the http server and >>> do the pubsub publishing there? >>> >>> On Thu, Oct 7, 2021 at 12:53 PM Luke Cwik <lc...@google.com> wrote: >>> >>>> I would suggest that you instead write the requests received within the >>>> splittable DoFn directly to a queue based sink and in another part of the >>>> pipeline read from that queue. For example if you were using Pubsub for the >>>> queue, your pipeline would look like: >>>> Create(LB config + pubsub topic A) -> ParDo(SDF get request from client >>>> and write to pubsub and then ack client) >>>> Pubsub(Read from A) -> ?Deduplicate? -> ... downstream processing ... >>>> Since the SDF will write to pubsub before it acknowledges the >>>> message you may write data that is not acked and once the client retries >>>> you'll publish a duplicate. If downstream processing is not resilient to >>>> duplicates then you'll want to have some unique piece of information to >>>> deduplicate on. >>>> >>>> Does the order in which the requests you get from a client or across >>>> clients matter? >>>> If yes, then you need to be aware that the parallel processing will >>>> impact the order in which you see things and you might need to have data >>>> sorted/ordered within the pipeline. >>>> >>>> >>>> >>>> On Wed, Oct 6, 2021 at 3:56 PM Daniel Collins <dpcoll...@google.com> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> Bear with me, this is a bit of a weird one. I've been toying around >>>>> with an idea to do http ingestion using a beam (specifically dataflow) >>>>> pipeline. The concept would be that you spin up an HTTP server on each >>>>> running task with a well known port as a static member of some class in >>>>> the >>>>> JAR (or upon initialization of a SDF the first time), then accept >>>>> requests, >>>>> but don't acknowledge them back to the client until the bundle >>>>> finalizer >>>>> <https://javadoc.io/static/org.apache.beam/beam-sdks-java-core/2.29.0/org/apache/beam/sdk/transforms/DoFn.BundleFinalizer.html> >>>>> so >>>>> you know they're persisted/ have moved down the pipeline. You could then >>>>> use a load balancer pointed at the instance group created by dataflow as >>>>> the target for incoming requests, and create a PCollection from incoming >>>>> user requests. >>>>> >>>>> The only part of this I don't think would work is preventing user >>>>> requests from being stranded on a server that will never run the SDF that >>>>> will complete them due to load balancing constraints. So my question is: >>>>> is >>>>> there a way to force an SDF to be run on every task where the JAR is >>>>> loaded? >>>>> >>>>> Thanks! >>>>> >>>>> -Dan >>>>> >>>>