Thanks for sharing your setup. You're right that we need timers to continuously ingest data to the testing pipeline.

Here is the Flink source which generates the data:
https://github.com/mwylde/beam/commit/09c62991773c749bc037cc2b6044896e2d34988a#diff-b2fc8d680d9c1da86ba23345f3bc83d4R42

On 04.10.18 19:31, Thomas Weise wrote:
FYI here is an example with native generator for portable Flink runner:

https://github.com/mwylde/beam/tree/micah_memory_leak
https://github.com/mwylde/beam/blob/22f7099b071e65a76110ecc5beda0636ca07e101/sdks/python/apache_beam/examples/streaming_leak.py

You can use it to run the portable Flink runner in streaming mode continuously for testing purposes.


On Mon, Oct 1, 2018 at 9:50 AM Thomas Weise <[email protected] <mailto:[email protected]>> wrote:



    On Mon, Oct 1, 2018 at 8:29 AM Maximilian Michels <[email protected]
    <mailto:[email protected]>> wrote:

         > and then have Flink manage the parallelism for stages
        downstream from that?@Pablo Can you clarify what you mean by that?

        Let me paraphrase this just to get a clear understanding. There
        are two
        approaches to test portable streaming pipelines:

        a) Use an Impulse followed by a test PTransform which generates
        testing
        data. This is similar to how streaming sources work which don't
        use the
        Read Transform. For basic testing this should work, even without
        support
        for Timers.


    AFAIK this works for bounded sources and batch mode of the Flink
    runner (staged execution).

    For streaming we need small bundles, we cannot have a Python ParDo
    block to emit records periodically.

    (With timers, the ParDo wouldn't block but instead schedule itself
    as needed.)

        b) Introduce a new URN which gets translated to a native
        Flink/Spark/xy
        testing transform.

        We should go for a) as this will make testing easier across
        portable
        runners. We previously discussed native transforms will be an
        option in
        Beam, but it would be preferable to leave them out of testing
        for now.

        Thanks,
        Max


        On 28.09.18 21:14, Thomas Weise wrote:
         > Thanks for sharing the link, this looks very promising!
         >
         > For the synthetic source, if we need a runner native trigger
        mechanism,
         > then it should probably just emit an empty byte array like
        the impulse
         > implementation does, and everything else could be left to SDK
        specific
         > transforms that are downstream. We don't have support for
        timers in the
         > portable Flink runner yet. With timers, there would not be
        the need for
         > a runner native URN and it could work just like Pablo described.
         >
         >
         > On Fri, Sep 28, 2018 at 3:09 AM Łukasz Gajowy
        <[email protected] <mailto:[email protected]>
         > <mailto:[email protected]
        <mailto:[email protected]>>> wrote:
         >
         >     Hi all,
         >
         >     thank you, Thomas, for starting this discussion and Pablo for
         >     sharing the ideas. FWIW adding here, we discussed this in
        terms of
         >     Core Beam Transform Load Tests that we are working on
        right now [1].
         >     If generating synthetic data will be possible for
        portable streaming
         >     pipelines, we could use it in our work to test Python
        streaming
         >     scenarios.
         >
         >     [1] _https://s.apache.org/GVMa_
         >
         >     pt., 28 wrz 2018 o 08:18 Pablo Estrada
        <[email protected] <mailto:[email protected]>
         >     <mailto:[email protected] <mailto:[email protected]>>>
        napisał(a):
         >
         >         Hi Thomas, all,
         >         yes, this is quite important for testing, and in fact
        I'd think
         >         it's important to streamline the insertion of native
        sources
         >         from different runners to make the current runners
        more usable.
         >         But that's another topic.
         >
         >         For generators of synthetic data, I had a couple
        ideas (and this
         >         will show my limited knowledge about Flink and
        Streaming, but oh
         >         well):
         >
         >         - Flink experts: Is it possible to add a pure-Beam
        generator
         >         that will do something like: Impulse ->
        ParDo(generate multiple
         >         elements) -> Forced "Write" to Flink (e.g. something
        like a
         >         reshuffle), and then have Flink manage the
        parallelism for
         >         stages downstream from that?
         >
         >         - If this is not possible, it may be worth writing some
         >         transform in Flink / other runners that can be
        plugged in by
         >         inserting a custom URN. In fact, it may be a good idea to
         >         streamline the insertion of native sources for each
        runner based
         >         on some sort of CustomURNTransform() ?
         >
         >         I hope I did not butcher those explanations too badly...
         >         Best
         >         -P.
         >
         >         On Thu, Sep 27, 2018, 5:55 PM Thomas Weise
        <[email protected] <mailto:[email protected]>
         >         <mailto:[email protected] <mailto:[email protected]>>> wrote:
         >
         >             There were a few discussions how we can
        facilitate testing
         >             for portable streaming pipelines with the Flink
        runner. The
         >             problem is that we currently don't have streaming
        sources in
         >             the Python SDK.
         >
         >             One way to support testing could be a generator
        that extends
         >             the idea of Impulse to provide a Flink native trigger
         >             transform, optionally parameterized with an
        interval and max
         >             count.
         >
         >             Test pipelines could then follow the generator
        with a Map
         >             function that creates whatever payloads are
        desirable.
         >
         >             Thoughts?
         >
         >             Thanks,
         >             Thomas
         >

Reply via email to