Hi, The example in that talk doesn't sort elements by timestamp, only putting them into buckets (windows) is based on time and the processing of those buckets is triggered by an advancing watermark. Even with just one input it's not (easily) possible to sort elements by timestamp. However, it is possible using buffering and if you discard late elements, as I outlined in my earlier answer.
Best, Aljoscha > On 28. Aug 2017, at 13:07, ziv <ziv.m...@elbitsystems.com> wrote: > > Hi Aljoscha, > > Since you answered the question you definitely can answer the bonus. I’ll be > grateful if you explain why using watermarks as you showed in notion of time > cant help here. > > Thanks. > > > > From: Aljoscha Krettek-2 [via Apache Flink Mailing List archive.] > [mailto:ml+s1008284n19405...@n3.nabble.com > <mailto:ml+s1008284n19405...@n3.nabble.com>] > Sent: 28 August 2017 12:36 > To: Meri Ziv > Subject: Re: Sync Flink > > Hi, > > This is not possible out-of-box, but you could use a ProcessFunction (or > rather CoProcessFunction) to buffer elements and set a timer so that you only > emit when the watermarks advances on both inputs. > > Best, > Aljoscha > >> On 28. Aug 2017, at 08:10, ziv <[hidden >> email]</user/SendEmail.jtp?type=node&node=19405&i=0>> wrote: >> >> People, >> >> I have two sources: >> >> 1) DataStream<Integer> int1 = env.addSource(new intWithDelat1()): generates >> series of integers in streaming of 1 sec delay between elemets. >> >> 2) DataStream<Long> long3 = env.addSource(new longWithDelay3()): generates >> series of longs in streaming of 3 sec delay. >> >> I want to: >> >> int1.connect(long3).flatMap(new printElemnts()); >> >> I want this transformation to be synchronized so that element from source 1 >> will be read only when the equivalent element from source 2 comes: >> 1 >> 1 >> 2 >> 2 >> 3 >> 3 >> … >> >> And not: >> 1 >> 1 >> 2 >> 3 >> 2 >> 4 >> 5 >> … >> >> any way to do that? >> >> (advanced: why things not similar to the example in 'notions of time' >> <https://www.youtube.com/watch?v=xiKsOocNkDA> at 11:20?) >> >> Thanks >> >> >> >> -- >> View this message in context: >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sync-Flink-tp19403.html >> Sent from the Apache Flink Mailing List archive. mailing list archive at >> Nabble.com. > > > ________________________________ > If you reply to this email, your message will be added to the discussion > below: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sync-Flink-tp19403p19405.html > > <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sync-Flink-tp19403p19405.html> > To start a new topic under Apache Flink Mailing List archive., email > ml+s1008284n1...@n3.nabble.com > <mailto:ml+s1008284n1...@n3.nabble.com><mailto:ml+s1008284n1...@n3.nabble.com > <mailto:ml+s1008284n1...@n3.nabble.com>> > To unsubscribe from Sync Flink, click > here<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=19403&code=eml2Lm1lcmlAZWxiaXRzeXN0ZW1zLmNvbXwxOTQwM3wxNzE4MzA5NDcz > > <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=19403&code=eml2Lm1lcmlAZWxiaXRzeXN0ZW1zLmNvbXwxOTQwM3wxNzE4MzA5NDcz>>. > NAML<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml > > <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>> > > The information in this e-mail transmission contains proprietary and business > sensitive information. Unauthorized interception of this e-mail may > constitute > a violation of law. If you are not the intended recipient, you are hereby > notified that any review, dissemination, distribution or duplication of this > communication is strictly prohibited. You are also asked to contact the > sender > by reply email and immediately destroy all copies of the original message. > > > > > -- > View this message in context: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sync-Flink-tp19403p19408.html > > <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sync-Flink-tp19403p19408.html> > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com <http://nabble.com/>.