[ 
https://issues.apache.org/jira/browse/FLINK-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15110465#comment-15110465
 ] 

ASF GitHub Bot commented on FLINK-2871:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50390580
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
 ---
    @@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws 
Exception {
                join.close();
                this.memManager.release(join.getFreedMemory());
        }
    +
    +   @Test
    +   public void testHashWithBuildSideOuterJoin1() throws Exception {
    +           final int NUM_KEYS = 20000;
    +           final int BUILD_VALS_PER_KEY = 1;
    +           final int PROBE_VALS_PER_KEY = 1;
    +
    +           // create a build input that gives 40000 pairs with 1 values 
sharing the same key
    +           MutableObjectIterator<IntPair> buildInput = new 
UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false);
    +
    +           // create a probe input that gives 20000 pairs with 1 values 
sharing a key
    +           MutableObjectIterator<IntPair> probeInput = new 
UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
    +
    +           // allocate the memory for the HashTable
    +           List<MemorySegment> memSegments;
    +           try {
    +                   // 33 is minimum number of pages required to perform 
hash join this inputs
    +                   memSegments = this.memManager.allocatePages(MEM_OWNER, 
33);
    +           }
    +           catch (MemoryAllocationException maex) {
    +                   fail("Memory for the Join could not be provided.");
    +                   return;
    +           }
    +
    +           // 
----------------------------------------------------------------------------------------
    +
    +           final MutableHashTable<IntPair, IntPair> join = new 
MutableHashTable<IntPair, IntPair>(
    +                   this.pairBuildSideAccesssor, 
this.pairProbeSideAccesssor,
    +                   this.pairBuildSideComparator, 
this.pairProbeSideComparator, this.pairComparator,
    +                   memSegments, ioManager);
    +           join.open(buildInput, probeInput, true);
    +
    +           final IntPair recordReuse = new IntPair();
    +           int numRecordsInJoinResult = 0;
    +
    +           while (join.nextRecord()) {
    +                   MutableObjectIterator<IntPair> buildSide = 
join.getBuildSideIterator();
    +                   while (buildSide.next(recordReuse) != null) {
    +                           numRecordsInJoinResult++;
    +                   }
    +           }
    +           Assert.assertEquals("Wrong number of records in join result.", 
2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
    +
    +           join.close();
    +           this.memManager.release(join.getFreedMemory());
    +   }
    +   
    +   @Test
    +   public void testHashWithBuildSideOuterJoin2() throws Exception {
    +           final int NUM_KEYS = 40000;
    +           final int BUILD_VALS_PER_KEY = 2;
    +           final int PROBE_VALS_PER_KEY = 1;
    +           
    +           // The keys of probe and build sides are overlapped, so there 
would be none unmatched build elements
    +           // after probe phase.
    +           
    +           // create a build input that gives 80000 pairs with 2 values 
sharing the same key
    +           MutableObjectIterator<IntPair> buildInput = new 
UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
    +           
    +           // create a probe input that gives 20000 pairs with 1 values 
sharing a key
    --- End diff --
    
    should be 40000 pairs


> Add OuterJoin strategy with HashTable on outer side
> ---------------------------------------------------
>
>                 Key: FLINK-2871
>                 URL: https://issues.apache.org/jira/browse/FLINK-2871
>             Project: Flink
>          Issue Type: New Feature
>          Components: Local Runtime, Optimizer
>    Affects Versions: 0.10.0
>            Reporter: Fabian Hueske
>            Assignee: Chengxiang Li
>            Priority: Minor
>
> Outer joins are currently supported with two local execution strategies:
> - sort-merge join
> - hash join where the hash table is built on the inner side. Hence, this 
> strategy is only supported for left and right outer joins.
> In order to support hash-tables on the outer side, we need a special hash 
> table implementation that gives access to all records which have not been 
> accessed during the probe phase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to