Hi list,

I am surprised by the behaviour of the code below. In particular, I am
puzzled by the fact that events do not seem to enter the window in order.
What am I doing wrong?

Here's what I don't understand. This test outputs the following error:

java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t": 10,
"Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15 "Join(Left,Right)"
{"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but
was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17 "None"
{"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times": [9,
15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]>

Now, the test is not complete, so it's not surprising that it fails, but
what really puzzles me is that there appears to be a moment when my window
contains an event at time 9 and an event at time 15, but does not yet
include the events at times 10 and 14, which should be part of the same
stream (and are indeed added later).

This code uses the 1.0.1 version of flink-java, flink-streaming-java_2.11
and flink-clients_2.11 (and junit 4.12), running under Java 8 with the
relevant parts of the pom.xml uncommented.

package enx.cep;
import static org.junit.Assert.assertEquals;
import org.apache.flink.streaming.api.TimeCharacteristic;import
org.apache.flink.streaming.api.datastream.DataStream;import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import
org.apache.flink.streaming.api.functions.windowing.WindowFunction;import
org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import
org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import
org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import
org.apache.flink.util.Collector;import org.junit.Test;
import java.io.*;import java.net.ServerSocket;import
java.net.Socket;import java.net.SocketTimeoutException;import
java.util.*;
import java.util.function.Function;import
java.util.stream.Collectors;import java.util.stream.StreamSupport;
public class AlgebraTest {
    @Test public void flinkCanJoinTwoStreams() throws Exception {
        final List<Msg> inputs = list(
                Msg(9, "Right", "val", 1),
                Msg(10, "Left", "attr", 1),
                Msg(12, "Left", "attr", 2),
                Msg(14, "Left", "attr", 1),
                Msg(15, "Right", "val", 1),
                Msg(17, "Right", "val", 3));
        final List<Msg> expected = list(
                Msg(10, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1,
                        "Right:@t", 9, "Left:@t", 10),
                Msg(15, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1,
                        "Right:@t", 15, "Left:@t", 14));
        final List<Msg> output = runStreamAlg(inputs, source -> {
            final DataStream<Msg> RightSource = source.filter(msg ->
"Right".equals(msg.type));
            final DataStream<Msg> LeftSource = source.filter(msg ->
"Left".equals(msg.type));

            // Join Left & Right streams on
            // Left.attr == Right.val && abs(Left.t - Right.t) < 5
            final DataStream<Msg> joined = LeftSource.union(RightSource)
                    .keyBy(msg -> {
                        if ("Right".equals(msg.type)) {
                            return msg.attrs.get("val");
                        } else if ("Left".equals(msg.type)) {
                            return msg.attrs.get("attr");
                        } else {
                            throw new RuntimeException();
                        }
                    })
                    .window(GlobalWindows.create())
                    .trigger(CountTrigger.of(1))
                    .apply(new WindowFunction<Msg, Msg, Object,
GlobalWindow>() {
                        @Override
                        public void apply(Object _key, GlobalWindow
_w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception {
                            List<Integer> times =
StreamSupport.stream(ins.spliterator(), false)
                                    .map(m -> m.timestamp)
                                    .collect(Collectors.toList());

collector.collect(Msg(times.stream().mapToInt(i ->
i).min().getAsInt(),
                                    "None", "times", times));
                        }
                    });

            return joined;
        });
        assertEquals(expected, output);
    }

    private final List<Msg> runStreamAlg(List<Msg> input,
Function<DataStream<Msg>, DataStream<Msg>> fn) {
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        final DataStream<Msg> source = env.fromCollection(input)
                .assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Msg>() {
                    @Override
                    public long extractAscendingTimestamp(Msg msg) {
                        return msg.timestamp * 1000;
                    }
                });
        final DataStream<Msg> transformed = fn.apply(source);

        final List<Msg> res = new ArrayList<>();
        try (final ServerSocket server = new ServerSocket(0)) {
            final int serverPort = server.getLocalPort();

            transformed.addSink(m -> {
                try (final Socket client = new Socket("localhost",
serverPort)) {
                    final ObjectOutputStream toServer = new
ObjectOutputStream(client.getOutputStream());
                    toServer.writeObject(m);
                    toServer.flush();
                    toServer.close();
                }
            });

            final Thread t = new Thread(() -> {
                while (true) {
                    try (final ObjectInputStream in = new
ObjectInputStream(server.accept().getInputStream())) {
                        res.add((Msg) in.readObject());
                        server.setSoTimeout(500);
                    } catch (SocketTimeoutException e) {
                        return;
                    } catch (java.io.IOException | ClassNotFoundException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            t.start();
            try {
                env.execute();
            } catch (Exception e) {
                e.printStackTrace();
            }
            t.join();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return res;
    }

    private static <T> List<T> list(T elem, T... others) {
        final List<T> res = new ArrayList<>();
        res.add(elem);
        for(T t: others) {
            res.add(t);
        }
        return res;
    }

    private static Msg Msg(int timestamp, String type, Object... attrs) {
        return new Msg(timestamp, type, attrs);
    }

    private static class Msg implements Serializable {
        private final String type;
        private final int timestamp;
        private final Map<String, Object> attrs;
        public Msg(int timestamp, String type, Object... attrs) {
            this.timestamp = timestamp;
            this.type = type;
            this.attrs = new HashMap<>();
            if (attrs.length % 2 != 0) throw new IllegalArgumentException();
            for (int i = 0; i < attrs.length; i += 2) {
                if (!(attrs[i] instanceof String)) throw new
IllegalArgumentException();
                this.attrs.put((String) attrs[i], attrs[i+1]);
            }
        }

        public String toString() {
            return String.format("[%d \"%s\" {%s}]",
                    this.timestamp,
                    this.type,
                    this.attrs.entrySet().stream()
                            .sorted((e1, e2) ->
e1.getKey().compareTo(e2.getKey()))
                            .map(e -> String.format("\"%s\": %s",
e.getKey(), e.getValue()))
                            .reduce((acc, el) -> acc + ", " + el)
                            .orElseGet(() -> ""));
        }
    }}

Reply via email to