Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50390858
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
 ---
    @@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws 
Exception {
                join.close();
                this.memManager.release(join.getFreedMemory());
        }
    +
    +   @Test
    +   public void testHashWithBuildSideOuterJoin1() throws Exception {
    +           final int NUM_KEYS = 20000;
    +           final int BUILD_VALS_PER_KEY = 1;
    +           final int PROBE_VALS_PER_KEY = 1;
    +
    +           // create a build input that gives 40000 pairs with 1 values 
sharing the same key
    +           MutableObjectIterator<IntPair> buildInput = new 
UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false);
    +
    +           // create a probe input that gives 20000 pairs with 1 values 
sharing a key
    +           MutableObjectIterator<IntPair> probeInput = new 
UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
    +
    +           // allocate the memory for the HashTable
    +           List<MemorySegment> memSegments;
    +           try {
    +                   // 33 is minimum number of pages required to perform 
hash join this inputs
    +                   memSegments = this.memManager.allocatePages(MEM_OWNER, 
33);
    +           }
    +           catch (MemoryAllocationException maex) {
    +                   fail("Memory for the Join could not be provided.");
    +                   return;
    +           }
    +
    +           // 
----------------------------------------------------------------------------------------
    +
    +           final MutableHashTable<IntPair, IntPair> join = new 
MutableHashTable<IntPair, IntPair>(
    +                   this.pairBuildSideAccesssor, 
this.pairProbeSideAccesssor,
    +                   this.pairBuildSideComparator, 
this.pairProbeSideComparator, this.pairComparator,
    +                   memSegments, ioManager);
    +           join.open(buildInput, probeInput, true);
    +
    +           final IntPair recordReuse = new IntPair();
    +           int numRecordsInJoinResult = 0;
    +
    +           while (join.nextRecord()) {
    +                   MutableObjectIterator<IntPair> buildSide = 
join.getBuildSideIterator();
    +                   while (buildSide.next(recordReuse) != null) {
    +                           numRecordsInJoinResult++;
    +                   }
    +           }
    +           Assert.assertEquals("Wrong number of records in join result.", 
2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
    +
    +           join.close();
    +           this.memManager.release(join.getFreedMemory());
    +   }
    +   
    +   @Test
    +   public void testHashWithBuildSideOuterJoin2() throws Exception {
    +           final int NUM_KEYS = 40000;
    +           final int BUILD_VALS_PER_KEY = 2;
    +           final int PROBE_VALS_PER_KEY = 1;
    +           
    +           // The keys of probe and build sides are overlapped, so there 
would be none unmatched build elements
    +           // after probe phase.
    +           
    +           // create a build input that gives 40000 pairs with 2 values 
sharing the same key
    +           MutableObjectIterator<IntPair> buildInput = new 
UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
    +           
    +           // create a probe input that gives 20000 pairs with 1 values 
sharing a key
    +           MutableObjectIterator<IntPair> probeInput = new 
UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
    +           
    +           // allocate the memory for the HashTable
    +           List<MemorySegment> memSegments;
    +           try {
    +                   // 33 is minimum number of pages required to perform 
hash join this inputs
    +                   memSegments = this.memManager.allocatePages(MEM_OWNER, 
33);
    +           }
    +           catch (MemoryAllocationException maex) {
    +                   fail("Memory for the Join could not be provided.");
    +                   return;
    +           }
    +           
    +           // 
----------------------------------------------------------------------------------------
    +           
    +           final MutableHashTable<IntPair, IntPair> join = new 
MutableHashTable<IntPair, IntPair>(
    +                   this.pairBuildSideAccesssor, 
this.pairProbeSideAccesssor,
    +                   this.pairBuildSideComparator, 
this.pairProbeSideComparator, this.pairComparator,
    +                   memSegments, ioManager);
    +           join.open(buildInput, probeInput, true);
    +           
    +           final IntPair recordReuse = new IntPair();
    +           int numRecordsInJoinResult = 0;
    +           
    +           while (join.nextRecord()) {
    +                   MutableObjectIterator<IntPair> buildSide = 
join.getBuildSideIterator();
    +                   IntPair next = buildSide.next(recordReuse);
    --- End diff --
    
    OK, maybe add a brief comment to make clear which behavior is tested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to