[ https://issues.apache.org/jira/browse/FLINK-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15110465#comment-15110465 ]
ASF GitHub Bot commented on FLINK-2871: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1469#discussion_r50390580 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java --- @@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws Exception { join.close(); this.memManager.release(join.getFreedMemory()); } + + @Test + public void testHashWithBuildSideOuterJoin1() throws Exception { + final int NUM_KEYS = 20000; + final int BUILD_VALS_PER_KEY = 1; + final int PROBE_VALS_PER_KEY = 1; + + // create a build input that gives 40000 pairs with 1 values sharing the same key + MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false); + + // create a probe input that gives 20000 pairs with 1 values sharing a key + MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true); + + // allocate the memory for the HashTable + List<MemorySegment> memSegments; + try { + // 33 is minimum number of pages required to perform hash join this inputs + memSegments = this.memManager.allocatePages(MEM_OWNER, 33); + } + catch (MemoryAllocationException maex) { + fail("Memory for the Join could not be provided."); + return; + } + + // ---------------------------------------------------------------------------------------- + + final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>( + this.pairBuildSideAccesssor, this.pairProbeSideAccesssor, + this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator, + memSegments, ioManager); + join.open(buildInput, probeInput, true); + + final IntPair recordReuse = new IntPair(); + int numRecordsInJoinResult = 0; + + while (join.nextRecord()) { + MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator(); + while (buildSide.next(recordReuse) != null) { + numRecordsInJoinResult++; + } + } + Assert.assertEquals("Wrong number of records in join result.", 2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult); + + join.close(); + this.memManager.release(join.getFreedMemory()); + } + + @Test + public void testHashWithBuildSideOuterJoin2() throws Exception { + final int NUM_KEYS = 40000; + final int BUILD_VALS_PER_KEY = 2; + final int PROBE_VALS_PER_KEY = 1; + + // The keys of probe and build sides are overlapped, so there would be none unmatched build elements + // after probe phase. + + // create a build input that gives 80000 pairs with 2 values sharing the same key + MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false); + + // create a probe input that gives 20000 pairs with 1 values sharing a key --- End diff -- should be 40000 pairs > Add OuterJoin strategy with HashTable on outer side > --------------------------------------------------- > > Key: FLINK-2871 > URL: https://issues.apache.org/jira/browse/FLINK-2871 > Project: Flink > Issue Type: New Feature > Components: Local Runtime, Optimizer > Affects Versions: 0.10.0 > Reporter: Fabian Hueske > Assignee: Chengxiang Li > Priority: Minor > > Outer joins are currently supported with two local execution strategies: > - sort-merge join > - hash join where the hash table is built on the inner side. Hence, this > strategy is only supported for left and right outer joins. > In order to support hash-tables on the outer side, we need a special hash > table implementation that gives access to all records which have not been > accessed during the probe phase. -- This message was sent by Atlassian JIRA (v6.3.4#6332)