[ 
https://issues.apache.org/jira/browse/FLINK-8997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691960#comment-16691960
 ] 

ASF GitHub Bot commented on FLINK-8997:
---------------------------------------

dawidwys closed pull request #7039: [FLINK-8997] Added sliding window 
aggregation to datastream test job
URL: https://github.com/apache/flink/pull/7039
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 8bd649cc5eb..4f69cdb4e04 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -25,6 +25,7 @@
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
@@ -38,6 +39,7 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import 
org.apache.flink.streaming.tests.artificialstate.ArtificalOperatorStateMapper;
@@ -86,6 +88,10 @@
  *     <li>sequence_generator_source.event_time.clock_progress (long, default 
- 100): The amount of event time to progress per event generated by the 
sequence generator.</li>
  *     <li>tumbling_window_operator.num_events (long, default - 20L): The 
duration of the window, indirectly determined by the target number of events in 
each window.
  *         Total duration is (sliding_window_operator.num_events) * 
(sequence_generator_source.event_time.clock_progress).</li>
+ *     <li>test_slide_factor (int, default - 3): test_slide_factor (int, 
default - 3): how many slides there are in a
+ *         single window (in other words at most how many windows may be 
opened at time for a given key) The length of
+ *         a window will be calculated as (test_slide_size) * 
(test_slide_factor)</li>
+ *     <li>test_slide_size (long, default - 250): length of a slide of sliding 
window in milliseconds. The length of a window will be calculated as 
(test_slide_size) * (test_slide_factor)</li>
  * </ul>
  */
 public class DataStreamAllroundTestJobFactory {
@@ -201,6 +207,14 @@
                .key("tumbling_window_operator.num_events")
                .defaultValue(20L);
 
+       private static final ConfigOption<Integer> TEST_SLIDE_FACTOR = 
ConfigOptions
+               .key("test_slide_factor")
+               .defaultValue(3);
+
+       private static final ConfigOption<Long> TEST_SLIDE_SIZE = ConfigOptions
+               .key("test_slide_size")
+               .defaultValue(250L);
+
        public static void setupEnvironment(StreamExecutionEnvironment env, 
ParameterTool pt) throws Exception {
 
                // set checkpointing semantics
@@ -455,4 +469,24 @@ static boolean isSimulateFailures(ParameterTool pt) {
                        listStateGenerator,
                        listStateDescriptor);
        }
+
+       static SlidingEventTimeWindows createSlidingWindow(ParameterTool pt) {
+               long slideSize = pt.getLong(
+                       TEST_SLIDE_SIZE.key(),
+                       TEST_SLIDE_SIZE.defaultValue());
+
+               long slideFactor = pt.getInt(
+                       TEST_SLIDE_FACTOR.key(),
+                       TEST_SLIDE_FACTOR.defaultValue()
+               );
+
+               return SlidingEventTimeWindows.of(Time.milliseconds(slideSize * 
slideFactor), Time.milliseconds(slideSize));
+       }
+
+       static FlatMapFunction<Tuple2<Integer, List<Event>>, String> 
createSlidingWindowCheckMapper(ParameterTool pt) {
+               return new SlidingWindowCheckMapper(pt.getInt(
+                       TEST_SLIDE_FACTOR.key(),
+                       TEST_SLIDE_FACTOR.defaultValue()
+               ));
+       }
 }
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 3c406c7598d..1f2758ddc36 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.formats.avro.typeutils.AvroSerializer;
@@ -36,6 +37,9 @@
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.applyTumblingWindows;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
@@ -43,6 +47,8 @@
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createFailureMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSlidingWindow;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSlidingWindowCheckMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
@@ -69,6 +75,8 @@
        private static final String TIME_WINDOW_OPER_NAME = 
"TumblingWindowOperator";
        private static final String SEMANTICS_CHECK_MAPPER_NAME = 
"SemanticsCheckMapper";
        private static final String FAILURE_MAPPER_NAME = "FailureMapper";
+       private static final String SLIDING_WINDOW_CHECK_MAPPER_NAME = 
"SlidingWindowCheckMapper";
+       private static final String SLIDING_WINDOW_AGG_NAME = 
"SlidingWindowOperator";
 
        public static void main(String[] args) throws Exception {
                final ParameterTool pt = ParameterTool.fromArgs(args);
@@ -153,8 +161,34 @@ public void apply(Integer integer, TimeWindow window, 
Iterable<Event> input, Col
                eventStream3.keyBy(Event::getKey)
                        .flatMap(createSemanticsCheckMapper(pt))
                        .name(SEMANTICS_CHECK_MAPPER_NAME)
-                       .uid("0007")
-                       .addSink(new PrintSinkFunction<>()).uid("0008");
+                       .uid("007")
+                       .addSink(new PrintSinkFunction<>())
+                       .uid("008");
+
+               // Check sliding windows aggregations. Output all elements 
assigned to a window and later on
+               // check if each event was emitted slide_factor number of times
+               DataStream<Tuple2<Integer, List<Event>>> eventStream4 = 
eventStream2.keyBy(Event::getKey)
+                       .window(createSlidingWindow(pt))
+                       .apply(new WindowFunction<Event, Tuple2<Integer, 
List<Event>>, Integer, TimeWindow>() {
+                               private static final long serialVersionUID = 
3166250579972849440L;
+
+                               @Override
+                               public void apply(
+                                       Integer key, TimeWindow window, 
Iterable<Event> input,
+                                       Collector<Tuple2<Integer, List<Event>>> 
out) throws Exception {
+
+                                       out.collect(Tuple2.of(key, 
StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList())));
+                               }
+                       })
+                       .name(SLIDING_WINDOW_AGG_NAME)
+                       .uid("009");
+
+               eventStream4.keyBy(events-> events.f0)
+                       .flatMap(createSlidingWindowCheckMapper(pt))
+                       .uid("010")
+                       .name(SLIDING_WINDOW_CHECK_MAPPER_NAME)
+                       .addSink(new PrintSinkFunction<>())
+                       .uid("011");
 
                env.execute("General purpose test job");
        }
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/Event.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/Event.java
index 8c219ed5863..af7a0ad6a6f 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/Event.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/Event.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.tests;
 
+import java.util.Objects;
+
 /**
  * The event type of records used in the {@link DataStreamAllroundTestProgram}.
  */
@@ -51,6 +53,26 @@ public String getPayload() {
                return payload;
        }
 
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               Event event = (Event) o;
+               return key == event.key &&
+                       eventTime == event.eventTime &&
+                       sequenceNumber == event.sequenceNumber &&
+                       Objects.equals(payload, event.payload);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(key, eventTime, sequenceNumber, payload);
+       }
+
        @Override
        public String toString() {
                return "Event{" +
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java
index b67e2823342..780e2aecd3e 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java
@@ -34,7 +34,7 @@
        private static final long serialVersionUID = -744070793650644485L;
 
        /** This value state tracks the current sequence number per key. */
-       private volatile ValueState<Long> sequenceValue;
+       private transient ValueState<Long> sequenceValue;
 
        /** This defines how semantics are checked for each update. */
        private final ValidatorFunction validator;
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java
new file mode 100644
index 00000000000..8d3242efda5
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SlidingWindowCheckMapper.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This mapper validates sliding event time window. It checks each event 
belongs to appropriate number of consecutive
+ * windows.
+ */
+public class SlidingWindowCheckMapper extends 
RichFlatMapFunction<Tuple2<Integer, List<Event>>, String> {
+
+       private static final long serialVersionUID = -744070793650644485L;
+
+       /** This value state tracks previously seen events with the number of 
windows they appeared in. */
+       private transient ValueState<List<Tuple2<Event, Integer>>> 
eventsSeenSoFar;
+
+       private transient ValueState<Long> lastSequenceNumber;
+
+       private final int slideFactor;
+
+       SlidingWindowCheckMapper(int slideFactor) {
+               this.slideFactor = slideFactor;
+       }
+
+       @Override
+       public void open(Configuration parameters) {
+               ValueStateDescriptor<List<Tuple2<Event, Integer>>> 
previousWindowDescriptor =
+                       new ValueStateDescriptor<>("eventsSeenSoFar",
+                               new ListTypeInfo<>(new 
TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO)));
+
+               eventsSeenSoFar = 
getRuntimeContext().getState(previousWindowDescriptor);
+
+               ValueStateDescriptor<Long> lastSequenceNumberDescriptor =
+                       new ValueStateDescriptor<>("lastSequenceNumber", 
BasicTypeInfo.LONG_TYPE_INFO);
+
+               lastSequenceNumber = 
getRuntimeContext().getState(lastSequenceNumberDescriptor);
+       }
+
+       @Override
+       public void flatMap(Tuple2<Integer, List<Event>> value, 
Collector<String> out) throws Exception {
+               List<Tuple2<Event, Integer>> previousWindowValues = 
Optional.ofNullable(eventsSeenSoFar.value()).orElseGet(
+                       Collections::emptyList);
+
+               List<Event> newValues = value.f1;
+               Optional<Event> lastEventInWindow = 
verifyWindowContiguity(newValues, out);
+
+               Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value();
+               List<Tuple2<Event, Integer>> newWindows =
+                       verifyPreviousOccurences(previousWindowValues, 
newValues, lastSequenceNumberSeenSoFar, out);
+
+               if (lastEventInWindow.isPresent()) {
+                       updateLastSeenSequenceNumber(lastEventInWindow.get(), 
lastSequenceNumberSeenSoFar, out);
+               }
+
+               eventsSeenSoFar.update(newWindows);
+       }
+
+       private void updateLastSeenSequenceNumber(
+                       Event lastEventInWindow,
+                       Long lastSequenceNumberSeenSoFar,
+                       Collector<String> out) throws IOException {
+               long lastSequenceNumberInWindow = 
lastEventInWindow.getSequenceNumber();
+               if (lastSequenceNumberSeenSoFar == null || 
lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) {
+                       lastSequenceNumber.update(lastSequenceNumberInWindow);
+               } else if (lastSequenceNumberInWindow < 
lastSequenceNumberSeenSoFar) {
+                       failWithSequenceNumberDecreased(lastEventInWindow, 
lastSequenceNumberSeenSoFar, out);
+               }
+       }
+
+       private void failWithSequenceNumberDecreased(
+                       Event lastEventInWindow,
+                       Long lastSequenceNumberSeenSoFar,
+                       Collector<String> out) {
+               out.collect(String.format("Last event in current window (%s) 
has sequence number lower than seen so far (%d)",
+                       lastEventInWindow,
+                       lastSequenceNumberSeenSoFar));
+       }
+
+       /**
+        * Verifies if all values from previous windows appear in the new one. 
Returns union of all events seen so far that
+        * were not seen <b>slideFactor</b> number of times yet.
+        */
+       private List<Tuple2<Event, Integer>> verifyPreviousOccurences(
+                       List<Tuple2<Event, Integer>> previousWindowValues,
+                       List<Event> newValues,
+                       Long lastSequenceNumberSeenSoFar,
+                       Collector<String> out) {
+               List<Tuple2<Event, Integer>> newEventsSeenSoFar = new 
ArrayList<>();
+               List<Event> seenEvents = new ArrayList<>();
+
+               for (Tuple2<Event, Integer> windowValue : previousWindowValues) 
{
+                       if (!newValues.contains(windowValue.f0)) {
+                               failWithEventNotSeenAlertMessage(windowValue, 
newValues, out);
+                       } else {
+                               seenEvents.add(windowValue.f0);
+                               
preserveOrDiscardIfSeenSlideFactorTimes(newEventsSeenSoFar, windowValue);
+                       }
+               }
+
+               addNotSeenValues(newEventsSeenSoFar, newValues, seenEvents, 
lastSequenceNumberSeenSoFar, out);
+
+               return newEventsSeenSoFar;
+       }
+
+       private void addNotSeenValues(
+                       List<Tuple2<Event, Integer>> newEventsSeenSoFar,
+                       List<Event> newValues,
+                       List<Event> seenValues,
+                       Long lastSequenceNumberSeenSoFar,
+                       Collector<String> out) {
+               newValues.stream()
+                       .filter(e -> !seenValues.contains(e))
+                       .forEach(e -> {
+                               if (lastSequenceNumberSeenSoFar == null || 
e.getSequenceNumber() > lastSequenceNumberSeenSoFar) {
+                                       newEventsSeenSoFar.add(Tuple2.of(e, 1));
+                               } else {
+                                       failWithEventSeenTooManyTimesMessage(e, 
out);
+                               }
+                       });
+       }
+
+       private void failWithEventSeenTooManyTimesMessage(Event e, 
Collector<String> out) {
+               out.collect(String.format("Alert: event %s seen more than %d 
times", e, slideFactor));
+       }
+
+       private void preserveOrDiscardIfSeenSlideFactorTimes(
+                       List<Tuple2<Event, Integer>> newEvenstSeenSoFar,
+                       Tuple2<Event, Integer> windowValue) {
+               int timesSeen = windowValue.f1 + 1;
+               if (timesSeen != slideFactor) {
+                       newEvenstSeenSoFar.add(Tuple2.of(windowValue.f0, 
timesSeen));
+               }
+       }
+
+       private void failWithEventNotSeenAlertMessage(
+                       Tuple2<Event, Integer> previousWindowValue,
+                       List<Event> currentWindowValues,
+                       Collector<String> out) {
+               out.collect(String.format(
+                       "Alert: event %s did not belong to %d consecutive 
windows. " +
+                               "Event seen so far %d times.Current window: %s",
+                       previousWindowValue.f0,
+                       slideFactor,
+                       previousWindowValue.f1,
+                       currentWindowValues));
+       }
+
+       private Optional<Event> verifyWindowContiguity(List<Event> newValues, 
Collector<String> out) {
+               return newValues.stream()
+                       
.sorted(Comparator.comparingLong(Event::getSequenceNumber))
+                       .reduce((event, event2) -> {
+                               if (event2.getSequenceNumber() - 1 != 
event.getSequenceNumber()) {
+                                       out.collect("Alert: events in window 
out ouf order!");
+                               }
+
+                               return event2;
+                       });
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add sliding window aggregation to the job
> -----------------------------------------
>
>                 Key: FLINK-8997
>                 URL: https://issues.apache.org/jira/browse/FLINK-8997
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>    Affects Versions: 1.5.0
>            Reporter: Stefan Richter
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> The test job should also test windowing. Sliding windows are probably the 
> most demanding form, so this would be a good pick for the test.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to