Hi,
yes, I'm afraid you need a custom operator for that. (We are working on
providing built-in support for this, though)

I sketched an Operator that does the sorting and also wrote a quick example
that uses it:
SortedWindowOperator:
https://gist.github.com/aljoscha/6600bc1121b7f8a0f68b89988dd341bd
SortedWindowExample:
https://gist.github.com/aljoscha/959fc61aff2cd774fe60da711f5ae40b

It puts incoming elements into buckets based on the timestamp and sorts
them once the watermark passes the end of a bucket. Take a look at
processWatermark(), there you would fill in your custom logic.

Cheers,
Aljoscha

On Fri, 15 Apr 2016 at 14:41 Gary Verhaegen <gary.verhae...@euranova.eu>
wrote:

> Hi Aljoscha,
>
> What I'm looking for is an operator that joins two streams together, but
> keeps the events in timestamp order.
>
> What I was trying to do with the window specification comes down to: for
> each event on that stream, I want to call this function with this event and
> all of the events that arrived within 5 minutes after it. So conceptually a
> sliding window, except that the width is defined in terms of time and the
> step in terms of count.
>
> What I will really need to do is probably manage the window myself in
> operator state (because in many cases I expect that I will not need the
> whole window, so it may be interesting to be able to evaluate that
> eagerly), but I think I really need the events to arrive in order.
>
> On 14 April 2016 at 17:04, Aljoscha Krettek <aljos...@apache.org> wrote:
>
> > Hi,
> > Flink does not make any guarantees about the order of arriving elements
> > except in the case of one-to-one forwarding patterns. That is, only for
> > map/flatMap/filter and such operations will the order in which two
> > successive operations see their elements be the same.
> >
> > Could you please describe in prose form what the expected outcome of your
> > windowing specification is. We could start from this and try to figure
> out
> > how to make Flink behave as it should.
> >
> > Cheers,
> > Aljoscha
> >
> > On Thu, 14 Apr 2016 at 16:32 Gary Verhaegen <gary.verhae...@euranova.eu>
> > wrote:
> >
> > > Hi list,
> > >
> > > I am surprised by the behaviour of the code below. In particular, I am
> > > puzzled by the fact that events do not seem to enter the window in
> order.
> > > What am I doing wrong?
> > >
> > > Here's what I don't understand. This test outputs the following error:
> > >
> > > java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t
> ":
> > > 10,
> > > "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15
> "Join(Left,Right)"
> > > {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but
> > > was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17
> "None"
> > > {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times":
> [9,
> > > 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]>
> > >
> > > Now, the test is not complete, so it's not surprising that it fails,
> but
> > > what really puzzles me is that there appears to be a moment when my
> > window
> > > contains an event at time 9 and an event at time 15, but does not yet
> > > include the events at times 10 and 14, which should be part of the same
> > > stream (and are indeed added later).
> > >
> > > This code uses the 1.0.1 version of flink-java,
> flink-streaming-java_2.11
> > > and flink-clients_2.11 (and junit 4.12), running under Java 8 with the
> > > relevant parts of the pom.xml uncommented.
> > >
> > > package enx.cep;
> > > import static org.junit.Assert.assertEquals;
> > > import org.apache.flink.streaming.api.TimeCharacteristic;import
> > > org.apache.flink.streaming.api.datastream.DataStream;import
> > >
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
> > >
> >
> org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import
> > >
> org.apache.flink.streaming.api.functions.windowing.WindowFunction;import
> > > org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import
> > > org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import
> > > org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import
> > > org.apache.flink.util.Collector;import org.junit.Test;
> > > import java.io.*;import java.net.ServerSocket;import
> > > java.net.Socket;import java.net.SocketTimeoutException;import
> > > java.util.*;
> > > import java.util.function.Function;import
> > > java.util.stream.Collectors;import java.util.stream.StreamSupport;
> > > public class AlgebraTest {
> > >     @Test public void flinkCanJoinTwoStreams() throws Exception {
> > >         final List<Msg> inputs = list(
> > >                 Msg(9, "Right", "val", 1),
> > >                 Msg(10, "Left", "attr", 1),
> > >                 Msg(12, "Left", "attr", 2),
> > >                 Msg(14, "Left", "attr", 1),
> > >                 Msg(15, "Right", "val", 1),
> > >                 Msg(17, "Right", "val", 3));
> > >         final List<Msg> expected = list(
> > >                 Msg(10, "Join(Left,Right)", "Right:val", 1,
> "Left:attr",
> > 1,
> > >                         "Right:@t", 9, "Left:@t", 10),
> > >                 Msg(15, "Join(Left,Right)", "Right:val", 1,
> "Left:attr",
> > 1,
> > >                         "Right:@t", 15, "Left:@t", 14));
> > >         final List<Msg> output = runStreamAlg(inputs, source -> {
> > >             final DataStream<Msg> RightSource = source.filter(msg ->
> > > "Right".equals(msg.type));
> > >             final DataStream<Msg> LeftSource = source.filter(msg ->
> > > "Left".equals(msg.type));
> > >
> > >             // Join Left & Right streams on
> > >             // Left.attr == Right.val && abs(Left.t - Right.t) < 5
> > >             final DataStream<Msg> joined =
> LeftSource.union(RightSource)
> > >                     .keyBy(msg -> {
> > >                         if ("Right".equals(msg.type)) {
> > >                             return msg.attrs.get("val");
> > >                         } else if ("Left".equals(msg.type)) {
> > >                             return msg.attrs.get("attr");
> > >                         } else {
> > >                             throw new RuntimeException();
> > >                         }
> > >                     })
> > >                     .window(GlobalWindows.create())
> > >                     .trigger(CountTrigger.of(1))
> > >                     .apply(new WindowFunction<Msg, Msg, Object,
> > > GlobalWindow>() {
> > >                         @Override
> > >                         public void apply(Object _key, GlobalWindow
> > > _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception {
> > >                             List<Integer> times =
> > > StreamSupport.stream(ins.spliterator(), false)
> > >                                     .map(m -> m.timestamp)
> > >                                     .collect(Collectors.toList());
> > >
> > > collector.collect(Msg(times.stream().mapToInt(i ->
> > > i).min().getAsInt(),
> > >                                     "None", "times", times));
> > >                         }
> > >                     });
> > >
> > >             return joined;
> > >         });
> > >         assertEquals(expected, output);
> > >     }
> > >
> > >     private final List<Msg> runStreamAlg(List<Msg> input,
> > > Function<DataStream<Msg>, DataStream<Msg>> fn) {
> > >         final StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >         final DataStream<Msg> source = env.fromCollection(input)
> > >                 .assignTimestampsAndWatermarks(new
> > > AscendingTimestampExtractor<Msg>() {
> > >                     @Override
> > >                     public long extractAscendingTimestamp(Msg msg) {
> > >                         return msg.timestamp * 1000;
> > >                     }
> > >                 });
> > >         final DataStream<Msg> transformed = fn.apply(source);
> > >
> > >         final List<Msg> res = new ArrayList<>();
> > >         try (final ServerSocket server = new ServerSocket(0)) {
> > >             final int serverPort = server.getLocalPort();
> > >
> > >             transformed.addSink(m -> {
> > >                 try (final Socket client = new Socket("localhost",
> > > serverPort)) {
> > >                     final ObjectOutputStream toServer = new
> > > ObjectOutputStream(client.getOutputStream());
> > >                     toServer.writeObject(m);
> > >                     toServer.flush();
> > >                     toServer.close();
> > >                 }
> > >             });
> > >
> > >             final Thread t = new Thread(() -> {
> > >                 while (true) {
> > >                     try (final ObjectInputStream in = new
> > > ObjectInputStream(server.accept().getInputStream())) {
> > >                         res.add((Msg) in.readObject());
> > >                         server.setSoTimeout(500);
> > >                     } catch (SocketTimeoutException e) {
> > >                         return;
> > >                     } catch (java.io.IOException |
> ClassNotFoundException
> > > e) {
> > >                         throw new RuntimeException(e);
> > >                     }
> > >                 }
> > >             });
> > >             t.start();
> > >             try {
> > >                 env.execute();
> > >             } catch (Exception e) {
> > >                 e.printStackTrace();
> > >             }
> > >             t.join();
> > >         } catch (Exception e) {
> > >             throw new RuntimeException(e);
> > >         }
> > >         return res;
> > >     }
> > >
> > >     private static <T> List<T> list(T elem, T... others) {
> > >         final List<T> res = new ArrayList<>();
> > >         res.add(elem);
> > >         for(T t: others) {
> > >             res.add(t);
> > >         }
> > >         return res;
> > >     }
> > >
> > >     private static Msg Msg(int timestamp, String type, Object...
> attrs) {
> > >         return new Msg(timestamp, type, attrs);
> > >     }
> > >
> > >     private static class Msg implements Serializable {
> > >         private final String type;
> > >         private final int timestamp;
> > >         private final Map<String, Object> attrs;
> > >         public Msg(int timestamp, String type, Object... attrs) {
> > >             this.timestamp = timestamp;
> > >             this.type = type;
> > >             this.attrs = new HashMap<>();
> > >             if (attrs.length % 2 != 0) throw new
> > > IllegalArgumentException();
> > >             for (int i = 0; i < attrs.length; i += 2) {
> > >                 if (!(attrs[i] instanceof String)) throw new
> > > IllegalArgumentException();
> > >                 this.attrs.put((String) attrs[i], attrs[i+1]);
> > >             }
> > >         }
> > >
> > >         public String toString() {
> > >             return String.format("[%d \"%s\" {%s}]",
> > >                     this.timestamp,
> > >                     this.type,
> > >                     this.attrs.entrySet().stream()
> > >                             .sorted((e1, e2) ->
> > > e1.getKey().compareTo(e2.getKey()))
> > >                             .map(e -> String.format("\"%s\": %s",
> > > e.getKey(), e.getValue()))
> > >                             .reduce((acc, el) -> acc + ", " + el)
> > >                             .orElseGet(() -> ""));
> > >         }
> > >     }}
> > >
> >
>

Reply via email to