http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java new file mode 100644 index 0000000..f69b4d7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; +import org.junit.Assert; + +import java.util.Collection; +import java.util.Map; + + +public final class MatchRemovingMatcher implements FlatJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple2<Integer,String>> { + private static final long serialVersionUID = 1L; + + private final Map<Integer, Collection<Match>> toRemoveFrom; + + public MatchRemovingMatcher(Map<Integer, Collection<Match>> map) { + this.toRemoveFrom = map; + } + + @Override + public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception { + final Integer key = rec1 != null ? (Integer) rec1.getField(0) : (Integer) rec2.getField(0); + final String value1 = rec1 != null ? (String) rec1.getField(1) : null; + final String value2 = rec2 != null ? (String) rec2.getField(1) : null; + + Collection<Match> matches = this.toRemoveFrom.get(key); + if (matches == null) { + Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected."); + } + + boolean contained = matches.remove(new Match(value1, value2)); + if (!contained) { + Assert.fail("Produced match was not contained: " + key + " - " + value1 + ":" + value2); + } + if (matches.isEmpty()) { + this.toRemoveFrom.remove(key); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/SimpleTupleJoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/SimpleTupleJoinFunction.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/SimpleTupleJoinFunction.java new file mode 100644 index 0000000..06a62e5 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/SimpleTupleJoinFunction.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.util.Collector; + +/** + * Simple flat join function that joins two binary tuples and considers null cases. + */ +public class SimpleTupleJoinFunction implements FlatJoinFunction<Tuple2<String, String>, Tuple2<String, Integer>, Tuple4<String, String, String, Object>> { + + @Override + public void join(Tuple2<String, String> first, Tuple2<String, Integer> second, Collector<Tuple4<String, String, String, Object>> out) throws Exception { + if (first == null) { + out.collect(new Tuple4<String, String, String, Object>(null, null, second.f0, second.f1)); + } else if (second == null) { + out.collect(new Tuple4<String, String, String, Object>(first.f0, first.f1, null, null)); + } else { + out.collect(new Tuple4<String, String, String, Object>(first.f0, first.f1, second.f0, second.f1)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java index 5d1ce7f..38d9992 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator; import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator; -import org.apache.flink.runtime.operators.sort.ReusingMergeMatchIterator; +import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator; import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; import org.apache.flink.runtime.operators.testutils.DummyInvokable; @@ -143,8 +143,8 @@ public class HashVsSortMiniBenchmark { final MutableObjectIterator<Record> sortedInput2 = sorter2.getIterator(); // compare with iterator values - ReusingMergeMatchIterator<Record, Record, Record> iterator = - new ReusingMergeMatchIterator<Record, Record, Record>(sortedInput1, sortedInput2, + ReusingMergeInnerJoinIterator<Record, Record, Record> iterator = + new ReusingMergeInnerJoinIterator<Record, Record, Record>(sortedInput1, sortedInput2, this.serializer1.getSerializer(), this.comparator1, this.serializer2.getSerializer(), this.comparator2, this.pairComparator11, this.memoryManager, this.ioManager, MEMORY_PAGES_FOR_MERGE, this.parentTask);