Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1469#discussion_r50390580
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
---
@@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws
Exception {
join.close();
this.memManager.release(join.getFreedMemory());
}
+
+ @Test
+ public void testHashWithBuildSideOuterJoin1() throws Exception {
+ final int NUM_KEYS = 20000;
+ final int BUILD_VALS_PER_KEY = 1;
+ final int PROBE_VALS_PER_KEY = 1;
+
+ // create a build input that gives 40000 pairs with 1 values
sharing the same key
+ MutableObjectIterator<IntPair> buildInput = new
UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false);
+
+ // create a probe input that gives 20000 pairs with 1 values
sharing a key
+ MutableObjectIterator<IntPair> probeInput = new
UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
+
+ // allocate the memory for the HashTable
+ List<MemorySegment> memSegments;
+ try {
+ // 33 is minimum number of pages required to perform
hash join this inputs
+ memSegments = this.memManager.allocatePages(MEM_OWNER,
33);
+ }
+ catch (MemoryAllocationException maex) {
+ fail("Memory for the Join could not be provided.");
+ return;
+ }
+
+ //
----------------------------------------------------------------------------------------
+
+ final MutableHashTable<IntPair, IntPair> join = new
MutableHashTable<IntPair, IntPair>(
+ this.pairBuildSideAccesssor,
this.pairProbeSideAccesssor,
+ this.pairBuildSideComparator,
this.pairProbeSideComparator, this.pairComparator,
+ memSegments, ioManager);
+ join.open(buildInput, probeInput, true);
+
+ final IntPair recordReuse = new IntPair();
+ int numRecordsInJoinResult = 0;
+
+ while (join.nextRecord()) {
+ MutableObjectIterator<IntPair> buildSide =
join.getBuildSideIterator();
+ while (buildSide.next(recordReuse) != null) {
+ numRecordsInJoinResult++;
+ }
+ }
+ Assert.assertEquals("Wrong number of records in join result.",
2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
+
+ join.close();
+ this.memManager.release(join.getFreedMemory());
+ }
+
+ @Test
+ public void testHashWithBuildSideOuterJoin2() throws Exception {
+ final int NUM_KEYS = 40000;
+ final int BUILD_VALS_PER_KEY = 2;
+ final int PROBE_VALS_PER_KEY = 1;
+
+ // The keys of probe and build sides are overlapped, so there
would be none unmatched build elements
+ // after probe phase.
+
+ // create a build input that gives 80000 pairs with 2 values
sharing the same key
+ MutableObjectIterator<IntPair> buildInput = new
UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+
+ // create a probe input that gives 20000 pairs with 1 values
sharing a key
--- End diff --
should be 40000 pairs
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---