Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1469#discussion_r49726803
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
---
@@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws
Exception {
join.close();
this.memManager.release(join.getFreedMemory());
}
+
+ @Test
+ public void testHashWithBuildSideOuterJoin1() throws Exception {
+ final int NUM_KEYS = 20000;
+ final int BUILD_VALS_PER_KEY = 1;
+ final int PROBE_VALS_PER_KEY = 1;
+
+ // create a build input that gives 40000 pairs with 1 values
sharing the same key
+ MutableObjectIterator<IntPair> buildInput = new
UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false);
+
+ // create a probe input that gives 20000 pairs with 1 values
sharing a key
+ MutableObjectIterator<IntPair> probeInput = new
UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
+
+ // allocate the memory for the HashTable
+ List<MemorySegment> memSegments;
+ try {
+ // 33 is minimum number of pages required to perform
hash join this inputs
+ memSegments = this.memManager.allocatePages(MEM_OWNER,
33);
+ }
+ catch (MemoryAllocationException maex) {
+ fail("Memory for the Join could not be provided.");
+ return;
+ }
+
+ //
----------------------------------------------------------------------------------------
+
+ final MutableHashTable<IntPair, IntPair> join = new
MutableHashTable<IntPair, IntPair>(
+ this.pairBuildSideAccesssor,
this.pairProbeSideAccesssor,
+ this.pairBuildSideComparator,
this.pairProbeSideComparator, this.pairComparator,
+ memSegments, ioManager);
+ join.open(buildInput, probeInput, true);
+
+ final IntPair recordReuse = new IntPair();
+ int numRecordsInJoinResult = 0;
+
+ while (join.nextRecord()) {
+ MutableObjectIterator<IntPair> buildSide =
join.getBuildSideIterator();
+ while (buildSide.next(recordReuse) != null) {
+ numRecordsInJoinResult++;
+ }
+ }
+ Assert.assertEquals("Wrong number of records in join result.",
2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
+
+ join.close();
+ this.memManager.release(join.getFreedMemory());
+ }
+
+ @Test
+ public void testHashWithBuildSideOuterJoin2() throws Exception {
+ final int NUM_KEYS = 40000;
+ final int BUILD_VALS_PER_KEY = 2;
+ final int PROBE_VALS_PER_KEY = 1;
+
+ // The keys of probe and build sides are overlapped, so there
would be none unmatched build elements
+ // after probe phase.
+
+ // create a build input that gives 40000 pairs with 2 values
sharing the same key
+ MutableObjectIterator<IntPair> buildInput = new
UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+
+ // create a probe input that gives 20000 pairs with 1 values
sharing a key
+ MutableObjectIterator<IntPair> probeInput = new
UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
+
+ // allocate the memory for the HashTable
+ List<MemorySegment> memSegments;
+ try {
+ // 33 is minimum number of pages required to perform
hash join this inputs
+ memSegments = this.memManager.allocatePages(MEM_OWNER,
33);
+ }
+ catch (MemoryAllocationException maex) {
+ fail("Memory for the Join could not be provided.");
+ return;
+ }
+
+ //
----------------------------------------------------------------------------------------
+
+ final MutableHashTable<IntPair, IntPair> join = new
MutableHashTable<IntPair, IntPair>(
+ this.pairBuildSideAccesssor,
this.pairProbeSideAccesssor,
+ this.pairBuildSideComparator,
this.pairProbeSideComparator, this.pairComparator,
+ memSegments, ioManager);
+ join.open(buildInput, probeInput, true);
+
+ final IntPair recordReuse = new IntPair();
+ int numRecordsInJoinResult = 0;
+
+ while (join.nextRecord()) {
+ MutableObjectIterator<IntPair> buildSide =
join.getBuildSideIterator();
+ IntPair next = buildSide.next(recordReuse);
--- End diff --
Does not really test the added outer join feature.
All build keys are included in the probe side.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---