I've opened a JIRA for adding the generator source (BEAM-5707) and sent out a very rough PR (https://github.com/apache/beam/pull/6637). Would appreciate any feedback.
On Mon, Oct 8, 2018 at 9:43 AM, Thomas Weise <t...@apache.org> wrote: > The portable runner does not support metrics yet: https://s.apache.org/ > apache-beam-portability-support-table > > There is also no JIRA referenced in the table, would be good to > locate/create it. > > On Mon, Oct 8, 2018 at 9:11 AM Łukasz Gajowy <lukasz.gaj...@gmail.com> > wrote: > >> Does anyone know what is the status of metrics support for Flink Portable >> Runner? I think we need them to be used in such tests to at least collect >> time metric that does not contain cluster warm up time, staging resources >> time and other things that can disturb the actual run time metric. We >> probably should use the metrics API in some other places (as described in >> the above-mentioned proposal). >> >> >> >> pon., 8 paź 2018 o 12:12 Maximilian Michels <m...@apache.org> napisał(a): >> >>> This is correct. However, the example code is only part of Lyft's code >>> base. Until timer support is done, we would have to do something similar >>> in our code base. >>> >>> On 08.10.18 02:34, Łukasz Gajowy wrote: >>> > Hi, >>> > >>> > just to clarify, judging from the above snippets: it seems that we are >>> > able now to run tests that use a native source for data generation and >>> > use them in this form until the Timers are supported. When Timers are >>> > there, we should consider switching to the Impulse + PTransform based >>> > solution (described above) because it's more portable - the current is >>> > dedicated to Flink only (which still is really cool). Is this correct >>> or >>> > am I missing something? >>> > >>> > Łukasz >>> > >>> > pt., 5 paź 2018 o 14:04 Maximilian Michels <m...@apache.org >>> > <mailto:m...@apache.org>> napisał(a): >>> > >>> > Thanks for sharing your setup. You're right that we need timers to >>> > continuously ingest data to the testing pipeline. >>> > >>> > Here is the Flink source which generates the data: >>> > https://github.com/mwylde/beam/commit/ >>> 09c62991773c749bc037cc2b6044896e2d34988a#diff- >>> b2fc8d680d9c1da86ba23345f3bc83d4R42 >>> > >>> > On 04.10.18 19:31, Thomas Weise wrote: >>> > > FYI here is an example with native generator for portable Flink >>> > runner: >>> > > >>> > > https://github.com/mwylde/beam/tree/micah_memory_leak >>> > > >>> > https://github.com/mwylde/beam/blob/22f7099b071e65a76110ecc5beda06 >>> 36ca07e101/sdks/python/apache_beam/examples/streaming_leak.py >>> > > >>> > > You can use it to run the portable Flink runner in streaming >>> mode >>> > > continuously for testing purposes. >>> > > >>> > > >>> > > On Mon, Oct 1, 2018 at 9:50 AM Thomas Weise <t...@apache.org >>> > <mailto:t...@apache.org> >>> > > <mailto:t...@apache.org <mailto:t...@apache.org>>> wrote: >>> > > >>> > > >>> > > >>> > > On Mon, Oct 1, 2018 at 8:29 AM Maximilian Michels >>> > <m...@apache.org <mailto:m...@apache.org> >>> > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: >>> > > >>> > > > and then have Flink manage the parallelism for stages >>> > > downstream from that?@Pablo Can you clarify what you >>> mean >>> > by that? >>> > > >>> > > Let me paraphrase this just to get a clear >>> understanding. >>> > There >>> > > are two >>> > > approaches to test portable streaming pipelines: >>> > > >>> > > a) Use an Impulse followed by a test PTransform which >>> > generates >>> > > testing >>> > > data. This is similar to how streaming sources work >>> which >>> > don't >>> > > use the >>> > > Read Transform. For basic testing this should work, even >>> > without >>> > > support >>> > > for Timers. >>> > > >>> > > >>> > > AFAIK this works for bounded sources and batch mode of the >>> Flink >>> > > runner (staged execution). >>> > > >>> > > For streaming we need small bundles, we cannot have a Python >>> > ParDo >>> > > block to emit records periodically. >>> > > >>> > > (With timers, the ParDo wouldn't block but instead schedule >>> > itself >>> > > as needed.) >>> > > >>> > > b) Introduce a new URN which gets translated to a native >>> > > Flink/Spark/xy >>> > > testing transform. >>> > > >>> > > We should go for a) as this will make testing easier >>> across >>> > > portable >>> > > runners. We previously discussed native transforms will >>> be an >>> > > option in >>> > > Beam, but it would be preferable to leave them out of >>> testing >>> > > for now. >>> > > >>> > > Thanks, >>> > > Max >>> > > >>> > > >>> > > On 28.09.18 21:14, Thomas Weise wrote: >>> > > > Thanks for sharing the link, this looks very >>> promising! >>> > > > >>> > > > For the synthetic source, if we need a runner native >>> > trigger >>> > > mechanism, >>> > > > then it should probably just emit an empty byte >>> array like >>> > > the impulse >>> > > > implementation does, and everything else could be >>> left >>> > to SDK >>> > > specific >>> > > > transforms that are downstream. We don't have >>> support for >>> > > timers in the >>> > > > portable Flink runner yet. With timers, there would >>> not be >>> > > the need for >>> > > > a runner native URN and it could work just like Pablo >>> > described. >>> > > > >>> > > > >>> > > > On Fri, Sep 28, 2018 at 3:09 AM Łukasz Gajowy >>> > > <lukasz.gaj...@gmail.com <mailto:lukasz.gajowy@gmail. >>> com> >>> > <mailto:lukasz.gaj...@gmail.com <mailto:lukasz.gaj...@gmail.com>> >>> > > > <mailto:lukasz.gaj...@gmail.com >>> > <mailto:lukasz.gaj...@gmail.com> >>> > > <mailto:lukasz.gaj...@gmail.com >>> > <mailto:lukasz.gaj...@gmail.com>>>> wrote: >>> > > > >>> > > > Hi all, >>> > > > >>> > > > thank you, Thomas, for starting this discussion >>> > and Pablo for >>> > > > sharing the ideas. FWIW adding here, we discussed >>> > this in >>> > > terms of >>> > > > Core Beam Transform Load Tests that we are >>> working on >>> > > right now [1]. >>> > > > If generating synthetic data will be possible for >>> > > portable streaming >>> > > > pipelines, we could use it in our work to test >>> Python >>> > > streaming >>> > > > scenarios. >>> > > > >>> > > > [1] _https://s.apache.org/GVMa_ >>> > > > >>> > > > pt., 28 wrz 2018 o 08:18 Pablo Estrada >>> > > <pabl...@google.com <mailto:pabl...@google.com> >>> > <mailto:pabl...@google.com <mailto:pabl...@google.com>> >>> > > > <mailto:pabl...@google.com >>> > <mailto:pabl...@google.com> <mailto:pabl...@google.com >>> > <mailto:pabl...@google.com>>>> >>> > > napisał(a): >>> > > > >>> > > > Hi Thomas, all, >>> > > > yes, this is quite important for testing, and >>> > in fact >>> > > I'd think >>> > > > it's important to streamline the insertion of >>> > native >>> > > sources >>> > > > from different runners to make the current >>> runners >>> > > more usable. >>> > > > But that's another topic. >>> > > > >>> > > > For generators of synthetic data, I had a >>> couple >>> > > ideas (and this >>> > > > will show my limited knowledge about Flink >>> and >>> > > Streaming, but oh >>> > > > well): >>> > > > >>> > > > - Flink experts: Is it possible to add a >>> pure-Beam >>> > > generator >>> > > > that will do something like: Impulse -> >>> > > ParDo(generate multiple >>> > > > elements) -> Forced "Write" to Flink (e.g. >>> > something >>> > > like a >>> > > > reshuffle), and then have Flink manage the >>> > > parallelism for >>> > > > stages downstream from that? >>> > > > >>> > > > - If this is not possible, it may be worth >>> > writing some >>> > > > transform in Flink / other runners that can >>> be >>> > > plugged in by >>> > > > inserting a custom URN. In fact, it may be a >>> > good idea to >>> > > > streamline the insertion of native sources >>> for >>> > each >>> > > runner based >>> > > > on some sort of CustomURNTransform() ? >>> > > > >>> > > > I hope I did not butcher those explanations >>> > too badly... >>> > > > Best >>> > > > -P. >>> > > > >>> > > > On Thu, Sep 27, 2018, 5:55 PM Thomas Weise >>> > > <t...@apache.org <mailto:t...@apache.org> >>> > <mailto:t...@apache.org <mailto:t...@apache.org>> >>> > > > <mailto:t...@apache.org <mailto: >>> t...@apache.org> >>> > <mailto:t...@apache.org <mailto:t...@apache.org>>>> wrote: >>> > > > >>> > > > There were a few discussions how we can >>> > > facilitate testing >>> > > > for portable streaming pipelines with the >>> > Flink >>> > > runner. The >>> > > > problem is that we currently don't have >>> > streaming >>> > > sources in >>> > > > the Python SDK. >>> > > > >>> > > > One way to support testing could be a >>> > generator >>> > > that extends >>> > > > the idea of Impulse to provide a Flink >>> > native trigger >>> > > > transform, optionally parameterized with >>> an >>> > > interval and max >>> > > > count. >>> > > > >>> > > > Test pipelines could then follow the >>> generator >>> > > with a Map >>> > > > function that creates whatever payloads >>> are >>> > > desirable. >>> > > > >>> > > > Thoughts? >>> > > > >>> > > > Thanks, >>> > > > Thomas >>> > > > >>> > > >>> > >>> >>