I've opened a JIRA for adding the generator source (BEAM-5707) and sent out
a very rough PR (https://github.com/apache/beam/pull/6637). Would
appreciate any feedback.

On Mon, Oct 8, 2018 at 9:43 AM, Thomas Weise <t...@apache.org> wrote:

> The portable runner does not support metrics yet: https://s.apache.org/
> apache-beam-portability-support-table
>
> There is also no JIRA referenced in the table, would be good to
> locate/create it.
>
> On Mon, Oct 8, 2018 at 9:11 AM Łukasz Gajowy <lukasz.gaj...@gmail.com>
> wrote:
>
>> Does anyone know what is the status of metrics support for Flink Portable
>> Runner? I think we need them to be used in such tests to at least collect
>> time metric that does not contain cluster warm up time, staging resources
>> time and other things that can disturb the actual run time metric. We
>> probably should use the metrics API in some other places (as described in
>> the above-mentioned proposal).
>>
>>
>>
>> pon., 8 paź 2018 o 12:12 Maximilian Michels <m...@apache.org> napisał(a):
>>
>>> This is correct. However, the example code is only part of Lyft's code
>>> base. Until timer support is done, we would have to do something similar
>>> in our code base.
>>>
>>> On 08.10.18 02:34, Łukasz Gajowy wrote:
>>> > Hi,
>>> >
>>> > just to clarify, judging from the above snippets: it seems that we are
>>> > able now to run tests that use a native source for data generation and
>>> > use them in this form until the Timers are supported. When Timers are
>>> > there, we should consider switching to the Impulse + PTransform based
>>> > solution (described above) because it's more portable - the current is
>>> > dedicated to Flink only (which still is really cool). Is this correct
>>> or
>>> > am I missing something?
>>> >
>>> > Łukasz
>>> >
>>> > pt., 5 paź 2018 o 14:04 Maximilian Michels <m...@apache.org
>>> > <mailto:m...@apache.org>> napisał(a):
>>> >
>>> >     Thanks for sharing your setup. You're right that we need timers to
>>> >     continuously ingest data to the testing pipeline.
>>> >
>>> >     Here is the Flink source which generates the data:
>>> >     https://github.com/mwylde/beam/commit/
>>> 09c62991773c749bc037cc2b6044896e2d34988a#diff-
>>> b2fc8d680d9c1da86ba23345f3bc83d4R42
>>> >
>>> >     On 04.10.18 19:31, Thomas Weise wrote:
>>> >      > FYI here is an example with native generator for portable Flink
>>> >     runner:
>>> >      >
>>> >      > https://github.com/mwylde/beam/tree/micah_memory_leak
>>> >      >
>>> >     https://github.com/mwylde/beam/blob/22f7099b071e65a76110ecc5beda06
>>> 36ca07e101/sdks/python/apache_beam/examples/streaming_leak.py
>>> >      >
>>> >      > You can use it to run the portable Flink runner in streaming
>>> mode
>>> >      > continuously for testing purposes.
>>> >      >
>>> >      >
>>> >      > On Mon, Oct 1, 2018 at 9:50 AM Thomas Weise <t...@apache.org
>>> >     <mailto:t...@apache.org>
>>> >      > <mailto:t...@apache.org <mailto:t...@apache.org>>> wrote:
>>> >      >
>>> >      >
>>> >      >
>>> >      >     On Mon, Oct 1, 2018 at 8:29 AM Maximilian Michels
>>> >     <m...@apache.org <mailto:m...@apache.org>
>>> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>>> >      >
>>> >      >          > and then have Flink manage the parallelism for stages
>>> >      >         downstream from that?@Pablo Can you clarify what you
>>> mean
>>> >     by that?
>>> >      >
>>> >      >         Let me paraphrase this just to get a clear
>>> understanding.
>>> >     There
>>> >      >         are two
>>> >      >         approaches to test portable streaming pipelines:
>>> >      >
>>> >      >         a) Use an Impulse followed by a test PTransform which
>>> >     generates
>>> >      >         testing
>>> >      >         data. This is similar to how streaming sources work
>>> which
>>> >     don't
>>> >      >         use the
>>> >      >         Read Transform. For basic testing this should work, even
>>> >     without
>>> >      >         support
>>> >      >         for Timers.
>>> >      >
>>> >      >
>>> >      >     AFAIK this works for bounded sources and batch mode of the
>>> Flink
>>> >      >     runner (staged execution).
>>> >      >
>>> >      >     For streaming we need small bundles, we cannot have a Python
>>> >     ParDo
>>> >      >     block to emit records periodically.
>>> >      >
>>> >      >     (With timers, the ParDo wouldn't block but instead schedule
>>> >     itself
>>> >      >     as needed.)
>>> >      >
>>> >      >         b) Introduce a new URN which gets translated to a native
>>> >      >         Flink/Spark/xy
>>> >      >         testing transform.
>>> >      >
>>> >      >         We should go for a) as this will make testing easier
>>> across
>>> >      >         portable
>>> >      >         runners. We previously discussed native transforms will
>>> be an
>>> >      >         option in
>>> >      >         Beam, but it would be preferable to leave them out of
>>> testing
>>> >      >         for now.
>>> >      >
>>> >      >         Thanks,
>>> >      >         Max
>>> >      >
>>> >      >
>>> >      >         On 28.09.18 21:14, Thomas Weise wrote:
>>> >      >          > Thanks for sharing the link, this looks very
>>> promising!
>>> >      >          >
>>> >      >          > For the synthetic source, if we need a runner native
>>> >     trigger
>>> >      >         mechanism,
>>> >      >          > then it should probably just emit an empty byte
>>> array like
>>> >      >         the impulse
>>> >      >          > implementation does, and everything else could be
>>> left
>>> >     to SDK
>>> >      >         specific
>>> >      >          > transforms that are downstream. We don't have
>>> support for
>>> >      >         timers in the
>>> >      >          > portable Flink runner yet. With timers, there would
>>> not be
>>> >      >         the need for
>>> >      >          > a runner native URN and it could work just like Pablo
>>> >     described.
>>> >      >          >
>>> >      >          >
>>> >      >          > On Fri, Sep 28, 2018 at 3:09 AM Łukasz Gajowy
>>> >      >         <lukasz.gaj...@gmail.com <mailto:lukasz.gajowy@gmail.
>>> com>
>>> >     <mailto:lukasz.gaj...@gmail.com <mailto:lukasz.gaj...@gmail.com>>
>>> >      >          > <mailto:lukasz.gaj...@gmail.com
>>> >     <mailto:lukasz.gaj...@gmail.com>
>>> >      >         <mailto:lukasz.gaj...@gmail.com
>>> >     <mailto:lukasz.gaj...@gmail.com>>>> wrote:
>>> >      >          >
>>> >      >          >     Hi all,
>>> >      >          >
>>> >      >          >     thank you, Thomas, for starting this discussion
>>> >     and Pablo for
>>> >      >          >     sharing the ideas. FWIW adding here, we discussed
>>> >     this in
>>> >      >         terms of
>>> >      >          >     Core Beam Transform Load Tests that we are
>>> working on
>>> >      >         right now [1].
>>> >      >          >     If generating synthetic data will be possible for
>>> >      >         portable streaming
>>> >      >          >     pipelines, we could use it in our work to test
>>> Python
>>> >      >         streaming
>>> >      >          >     scenarios.
>>> >      >          >
>>> >      >          >     [1] _https://s.apache.org/GVMa_
>>> >      >          >
>>> >      >          >     pt., 28 wrz 2018 o 08:18 Pablo Estrada
>>> >      >         <pabl...@google.com <mailto:pabl...@google.com>
>>> >     <mailto:pabl...@google.com <mailto:pabl...@google.com>>
>>> >      >          >     <mailto:pabl...@google.com
>>> >     <mailto:pabl...@google.com> <mailto:pabl...@google.com
>>> >     <mailto:pabl...@google.com>>>>
>>> >      >         napisał(a):
>>> >      >          >
>>> >      >          >         Hi Thomas, all,
>>> >      >          >         yes, this is quite important for testing, and
>>> >     in fact
>>> >      >         I'd think
>>> >      >          >         it's important to streamline the insertion of
>>> >     native
>>> >      >         sources
>>> >      >          >         from different runners to make the current
>>> runners
>>> >      >         more usable.
>>> >      >          >         But that's another topic.
>>> >      >          >
>>> >      >          >         For generators of synthetic data, I had a
>>> couple
>>> >      >         ideas (and this
>>> >      >          >         will show my limited knowledge about Flink
>>> and
>>> >      >         Streaming, but oh
>>> >      >          >         well):
>>> >      >          >
>>> >      >          >         - Flink experts: Is it possible to add a
>>> pure-Beam
>>> >      >         generator
>>> >      >          >         that will do something like: Impulse ->
>>> >      >         ParDo(generate multiple
>>> >      >          >         elements) -> Forced "Write" to Flink (e.g.
>>> >     something
>>> >      >         like a
>>> >      >          >         reshuffle), and then have Flink manage the
>>> >      >         parallelism for
>>> >      >          >         stages downstream from that?
>>> >      >          >
>>> >      >          >         - If this is not possible, it may be worth
>>> >     writing some
>>> >      >          >         transform in Flink / other runners that can
>>> be
>>> >      >         plugged in by
>>> >      >          >         inserting a custom URN. In fact, it may be a
>>> >     good idea to
>>> >      >          >         streamline the insertion of native sources
>>> for
>>> >     each
>>> >      >         runner based
>>> >      >          >         on some sort of CustomURNTransform() ?
>>> >      >          >
>>> >      >          >         I hope I did not butcher those explanations
>>> >     too badly...
>>> >      >          >         Best
>>> >      >          >         -P.
>>> >      >          >
>>> >      >          >         On Thu, Sep 27, 2018, 5:55 PM Thomas Weise
>>> >      >         <t...@apache.org <mailto:t...@apache.org>
>>> >     <mailto:t...@apache.org <mailto:t...@apache.org>>
>>> >      >          >         <mailto:t...@apache.org <mailto:
>>> t...@apache.org>
>>> >     <mailto:t...@apache.org <mailto:t...@apache.org>>>> wrote:
>>> >      >          >
>>> >      >          >             There were a few discussions how we can
>>> >      >         facilitate testing
>>> >      >          >             for portable streaming pipelines with the
>>> >     Flink
>>> >      >         runner. The
>>> >      >          >             problem is that we currently don't have
>>> >     streaming
>>> >      >         sources in
>>> >      >          >             the Python SDK.
>>> >      >          >
>>> >      >          >             One way to support testing could be a
>>> >     generator
>>> >      >         that extends
>>> >      >          >             the idea of Impulse to provide a Flink
>>> >     native trigger
>>> >      >          >             transform, optionally parameterized with
>>> an
>>> >      >         interval and max
>>> >      >          >             count.
>>> >      >          >
>>> >      >          >             Test pipelines could then follow the
>>> generator
>>> >      >         with a Map
>>> >      >          >             function that creates whatever payloads
>>> are
>>> >      >         desirable.
>>> >      >          >
>>> >      >          >             Thoughts?
>>> >      >          >
>>> >      >          >             Thanks,
>>> >      >          >             Thomas
>>> >      >          >
>>> >      >
>>> >
>>>
>>

Reply via email to