[ 
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15612219#comment-15612219
 ] 

ASF GitHub Bot commented on FLINK-4552:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2572#discussion_r85361208
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
 ---
    @@ -0,0 +1,369 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators.windowing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.state.MergingState;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueState;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
    +import org.apache.flink.runtime.query.KvStateRegistry;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyedStateBackend;
    +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.api.operators.TestInternalTimerService;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.windowing.triggers.Trigger;
    +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
    +import org.apache.flink.streaming.api.windowing.windows.Window;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +
    +/**
    + * Utility for testing {@link Trigger} behaviour.
    + */
    +public class TriggerTestHarness<T, W extends Window> {
    +
    +   private static final Integer KEY = 1;
    +
    +   private final Trigger<T, W> trigger;
    +   private final TypeSerializer<W> windowSerializer;
    +
    +   private final HeapKeyedStateBackend<Integer> stateBackend;
    +   private final TestInternalTimerService<Integer, W> internalTimerService;
    +
    +   public TriggerTestHarness(
    +                   Trigger<T, W> trigger,
    +                   TypeSerializer<W> windowSerializer) throws Exception {
    +           this.trigger = trigger;
    +           this.windowSerializer = windowSerializer;
    +
    +           // we only ever use one key, other tests make sure that windows 
work across different
    +           // keys
    +           DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
    +           MemoryStateBackend backend = new MemoryStateBackend();
    +
    +           @SuppressWarnings("unchecked")
    +           HeapKeyedStateBackend<Integer> stateBackend = 
(HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv,
    +                           new JobID(),
    +                           "test_op",
    +                           IntSerializer.INSTANCE,
    +                           1,
    +                           new KeyGroupRange(0, 0),
    +                           new KvStateRegistry().createTaskRegistry(new 
JobID(), new JobVertexID()));
    +           this.stateBackend = stateBackend;
    +
    +           this.stateBackend.setCurrentKey(0);
    +
    +           this.internalTimerService = new TestInternalTimerService<>(new 
KeyContext() {
    +                   @Override
    +                   public void setCurrentKey(Object key) {
    +                           // ignore
    +                   }
    +
    +                   @Override
    +                   public Object getCurrentKey() {
    +                           return KEY;
    +                   }
    +           });
    +   }
    +
    +   public int numProcessingTimeTimers() {
    +           return internalTimerService.numProcessingTimeTimers();
    +   }
    +
    +   public int numProcessingTimeTimers(W window) {
    +           return internalTimerService.numProcessingTimeTimers(window);
    +   }
    +
    +   public int numEventTimeTimers() {
    +           return internalTimerService.numEventTimeTimers();
    +   }
    +
    +   public int numEventTimeTimers(W window) {
    +           return internalTimerService.numEventTimeTimers(window);
    +   }
    +
    +   public int numStateEntries() {
    +           return stateBackend.numStateEntries();
    +   }
    +
    +   public int numStateEntries(W window) {
    +           return stateBackend.numStateEntries(window);
    +   }
    +
    +   /**
    +    * Injects one element into the trigger for the given window and 
returns the result of
    +    * {@link Trigger#onElement(Object, long, Window, 
Trigger.TriggerContext)}
    +    */
    +   public TriggerResult processElement(StreamRecord<T> element, W window) 
throws Exception {
    +           TestTriggerContext<Integer, W> triggerContext = new 
TestTriggerContext<>(
    +                           KEY,
    +                           window,
    +                           internalTimerService,
    +                           stateBackend,
    +                           windowSerializer);
    +           return trigger.onElement(element.getValue(), 
element.getTimestamp(), window, triggerContext);
    +   }
    +
    +   /**
    +    * Advanced processing time and checks whether we have exactly one 
firing for the given
    +    * window. The result of {@link Trigger#onProcessingTime(long, Window, 
Trigger.TriggerContext)}
    +    * is returned for that firing.
    +    */
    +   public TriggerResult advanceProcessingTime(long time, W window) throws 
Exception {
    +           Collection<Tuple2<W, TriggerResult>> firings = 
advanceProcessingTime(time);
    +
    +           if (firings.size() != 1) {
    +                   throw new IllegalStateException("Must have exactly one 
timer firing. Fired timers: " + firings);
    +           }
    +
    +           Tuple2<W, TriggerResult> firing = firings.iterator().next();
    --- End diff --
    
    I don't think this checks EXACTLY one, but only AT LEAST one.


> Refactor WindowOperator/Trigger Tests
> -------------------------------------
>
>                 Key: FLINK-4552
>                 URL: https://issues.apache.org/jira/browse/FLINK-4552
>             Project: Flink
>          Issue Type: Improvement
>          Components: Windowing Operators
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> Right now, tests for {{WindowOperator}}, {{WindowAssigner}}, {{Trigger}} and 
> {{WindowFunction}} are all conflated in {{WindowOperatorTest}}. All of these 
> test that a certain combination of a {{Trigger}}, {{WindowAssigner}} and 
> {{WindowFunction}} produce the expected output.
> We should modularize these tests and spread them out across multiple files, 
> possibly one per trigger, for the triggers. Also, we should extend/change the 
> tests in some key ways:
>  - {{WindowOperatorTest}} test should just verify that the interaction 
> between {{WindowOperator}} and the various other parts works as expected, 
> that the correct methods on {{Trigger}} and {{WindowFunction}} are called at 
> the expected time and that snapshotting, timers, cleanup etc. work correctly. 
> These tests should also verify that the different state types and 
> {{WindowFunctions}} work correctly.
>  - {{Trigger}} tests should present elements to triggers and verify that they 
> fire at the correct times. The actual output of the {{WindowFunction}} is not 
> important for these tests. We should also test that triggers correctly clean 
> up state and timers.
>  - {{WindowAssigner}} tests should test each window assigner and also verify 
> that, for example, the offset parameter of time-based windows works correctly.
> There is already {{WindowingTestHarness}} but it is not used by tests, I 
> think we can expand on that and provide more thorough test coverage while 
> also making the tests more maintainable ({{WindowOperatorTest.java}} is 
> nearing 3000 lines of code).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to