Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5342#discussion_r163506497
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperatorTest.java
---
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import
org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
+// TODO: Parameterize to use different state backends --> This would
require circular dependency on flink rocksdb
+@RunWith(Parameterized.class)
+public class TimeBoundedStreamJoinOperatorTest {
+
+ private final boolean lhsFasterThanRhs;
+
+ @Parameters(name = "lhs faster than rhs stream: {0}")
+ public static Boolean[] data() {
+ return new Boolean[]{true, false};
+ }
+
+ public TimeBoundedStreamJoinOperatorTest(boolean lhsFasterThanRhs) {
+ this.lhsFasterThanRhs = lhsFasterThanRhs;
+ }
+
+ @Test // lhs - 2 <= rhs <= rhs + 2
+ public void testNegativeInclusiveAndNegativeInclusive() throws
Exception {
+
+ long lowerBound = -2;
+ boolean lowerBoundInclusive = true;
+
+ long upperBound = -1;
+ boolean upperBoundInclusive = true;
+
+ try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem,
TestElem, Tuple2<TestElem, TestElem>> testHarness
+ = createTestHarness(lowerBound,
lowerBoundInclusive, upperBound, upperBoundInclusive)) {
+
+
+ testHarness.setup();
+ testHarness.open();
+
+ prepareTestHarness(testHarness);
+
+ List<StreamRecord<Tuple2<TestElem, TestElem>>>
expectedOutput = Lists.newArrayList(
+ streamRecordOf(2, 1),
+ streamRecordOf(3, 1),
+ streamRecordOf(3, 2),
+ streamRecordOf(4, 2),
+ streamRecordOf(4, 3)
+ );
+
+ assertOutput(expectedOutput, testHarness.getOutput());
+ ensureNoLateData(testHarness.getOutput());
+ }
+ }
+
+ @Test // lhs - 1 <= rhs <= rhs + 1
+ public void testNegativeInclusiveAndPositiveInclusive() throws
Exception {
+
+ long lowerBound = -1;
+ boolean lowerBoundInclusive = true;
+
+ long upperBound = 1;
+ boolean upperBoundInclusive = true;
+
+ try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem,
TestElem, Tuple2<TestElem, TestElem>> testHarness
+ = createTestHarness(lowerBound,
lowerBoundInclusive, upperBound, upperBoundInclusive)) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ prepareTestHarness(testHarness);
+
+ List<StreamRecord<Tuple2<TestElem, TestElem>>>
expectedOutput = Lists.newArrayList(
+ streamRecordOf(1, 1),
+ streamRecordOf(1, 2),
+ streamRecordOf(2, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(2, 3),
+ streamRecordOf(3, 2),
+ streamRecordOf(3, 3),
+ streamRecordOf(3, 4),
+ streamRecordOf(4, 3),
+ streamRecordOf(4, 4)
+ );
+
+ ConcurrentLinkedQueue<Object> output =
testHarness.getOutput();
+
+ assertOutput(expectedOutput, testHarness.getOutput());
+ ensureNoLateData(output);
+
+ }
+ }
+
+ @Test // lhs + 1 <= rhs <= lhs + 2
+ public void testPositiveInclusiveAndPositiveInclusive() throws
Exception {
+ long lowerBound = 1;
+ long upperBound = 2;
+
+ boolean lowerBoundInclusive = true;
+ boolean upperBoundInclusive = true;
+
+ try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem,
TestElem, Tuple2<TestElem, TestElem>> testHarness
+ = createTestHarness(lowerBound,
lowerBoundInclusive, upperBound, upperBoundInclusive)) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ prepareTestHarness(testHarness);
+
+ List<StreamRecord<Tuple2<TestElem, TestElem>>> expected
= Lists.newArrayList(
+ streamRecordOf(1, 2),
+ streamRecordOf(1, 3),
+ streamRecordOf(2, 3),
+ streamRecordOf(2, 4),
+ streamRecordOf(3, 4)
+ );
+
+ assertOutput(expected, testHarness.getOutput());
+ ensureNoLateData(testHarness.getOutput());
+ }
+ }
+
+ @Test
+ public void testNegativeExclusiveAndNegativeExlusive() throws Exception
{
+ long lowerBound = -3;
+ boolean lowerBoundInclusive = false;
+
+ long upperBound = -1;
+ boolean upperBoundInclusive = false;
+
+ try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem,
TestElem, Tuple2<TestElem, TestElem>> testHarness
+ = createTestHarness(lowerBound,
lowerBoundInclusive, upperBound, upperBoundInclusive)) {
+
+ testHarness.setup();
+ testHarness.open();
+ prepareTestHarness(testHarness);
+
+ List<StreamRecord<Tuple2<TestElem, TestElem>>>
expectedOutput = Lists.newArrayList(
+ streamRecordOf(3, 1),
+ streamRecordOf(4, 2)
+ );
+
+ ConcurrentLinkedQueue<Object> output =
testHarness.getOutput();
+
+ assertOutput(expectedOutput, testHarness.getOutput());
+ ensureNoLateData(output);
+ }
+ }
+
+ @Test
+ public void testNegativeExclusiveAndPositiveExlusive() throws Exception
{
+ long lowerBound = -1;
+ boolean lowerBoundInclusive = false;
+
+ long upperBound = 1;
+ boolean upperBoundInclusive = false;
+
+ try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem,
TestElem, Tuple2<TestElem, TestElem>> testHarness
+ = createTestHarness(lowerBound,
lowerBoundInclusive, upperBound, upperBoundInclusive)) {
+
+ testHarness.setup();
+ testHarness.open();
+ prepareTestHarness(testHarness);
+
+ List<StreamRecord<Tuple2<TestElem, TestElem>>>
expectedOutput = Lists.newArrayList(
+ streamRecordOf(1, 1),
+ streamRecordOf(2, 2),
+ streamRecordOf(3, 3),
+ streamRecordOf(4, 4)
+ );
+
+ ConcurrentLinkedQueue<Object> output =
testHarness.getOutput();
+
+ assertOutput(expectedOutput, testHarness.getOutput());
+ ensureNoLateData(output);
+ }
+ }
+
+ @Test
+ public void testPositiveExclusiveAndPositiveExlusive() throws Exception
{
+ long lowerBound = 1;
+ boolean lowerBoundInclusive = false;
+
+ long upperBound = 3;
+ boolean upperBoundInclusive = false;
+
+ try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem,
TestElem, Tuple2<TestElem, TestElem>> testHarness
+ = createTestHarness(lowerBound,
lowerBoundInclusive, upperBound, upperBoundInclusive)) {
+
+ testHarness.setup();
+ testHarness.open();
+ prepareTestHarness(testHarness);
+
+ List<StreamRecord<Tuple2<TestElem, TestElem>>>
expectedOutput = Lists.newArrayList(
+ streamRecordOf(1, 3),
+ streamRecordOf(2, 4)
+ );
+
+ ConcurrentLinkedQueue<Object> output =
testHarness.getOutput();
+
+ assertOutput(expectedOutput, testHarness.getOutput());
+ ensureNoLateData(output);
+
+ }
+ }
+
+ @Test
+ public void stateGetsCleanedWhenNotNeeded() throws Exception {
+
+ long lowerBound = 1;
+ boolean lowerBoundInclusive = true;
+
+ long upperBound = 2;
+ boolean upperBoundInclusive = true;
+
+ TimeBoundedStreamJoinOperator<TestElem, TestElem> operator =
new TimeBoundedStreamJoinOperator<>(
+ lowerBound,
--- End diff --
Here, and wherever we instantiate an operator, we should now add the
serializers for the input elements. This can be done with the
`TypeInformation.of(new TypeHint<TestElem>() {}).createSerializer(new
ExecutionConfig())`.
---