Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1469#discussion_r50390858 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java --- @@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws Exception { join.close(); this.memManager.release(join.getFreedMemory()); } + + @Test + public void testHashWithBuildSideOuterJoin1() throws Exception { + final int NUM_KEYS = 20000; + final int BUILD_VALS_PER_KEY = 1; + final int PROBE_VALS_PER_KEY = 1; + + // create a build input that gives 40000 pairs with 1 values sharing the same key + MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false); + + // create a probe input that gives 20000 pairs with 1 values sharing a key + MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true); + + // allocate the memory for the HashTable + List<MemorySegment> memSegments; + try { + // 33 is minimum number of pages required to perform hash join this inputs + memSegments = this.memManager.allocatePages(MEM_OWNER, 33); + } + catch (MemoryAllocationException maex) { + fail("Memory for the Join could not be provided."); + return; + } + + // ---------------------------------------------------------------------------------------- + + final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>( + this.pairBuildSideAccesssor, this.pairProbeSideAccesssor, + this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator, + memSegments, ioManager); + join.open(buildInput, probeInput, true); + + final IntPair recordReuse = new IntPair(); + int numRecordsInJoinResult = 0; + + while (join.nextRecord()) { + MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator(); + while (buildSide.next(recordReuse) != null) { + numRecordsInJoinResult++; + } + } + Assert.assertEquals("Wrong number of records in join result.", 2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult); + + join.close(); + this.memManager.release(join.getFreedMemory()); + } + + @Test + public void testHashWithBuildSideOuterJoin2() throws Exception { + final int NUM_KEYS = 40000; + final int BUILD_VALS_PER_KEY = 2; + final int PROBE_VALS_PER_KEY = 1; + + // The keys of probe and build sides are overlapped, so there would be none unmatched build elements + // after probe phase. + + // create a build input that gives 40000 pairs with 2 values sharing the same key + MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false); + + // create a probe input that gives 20000 pairs with 1 values sharing a key + MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true); + + // allocate the memory for the HashTable + List<MemorySegment> memSegments; + try { + // 33 is minimum number of pages required to perform hash join this inputs + memSegments = this.memManager.allocatePages(MEM_OWNER, 33); + } + catch (MemoryAllocationException maex) { + fail("Memory for the Join could not be provided."); + return; + } + + // ---------------------------------------------------------------------------------------- + + final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>( + this.pairBuildSideAccesssor, this.pairProbeSideAccesssor, + this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator, + memSegments, ioManager); + join.open(buildInput, probeInput, true); + + final IntPair recordReuse = new IntPair(); + int numRecordsInJoinResult = 0; + + while (join.nextRecord()) { + MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator(); + IntPair next = buildSide.next(recordReuse); --- End diff -- OK, maybe add a brief comment to make clear which behavior is tested.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---