[
https://issues.apache.org/jira/browse/FLINK-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15110468#comment-15110468
]
ASF GitHub Bot commented on FLINK-2871:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1469#discussion_r50390858
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
---
@@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws
Exception {
join.close();
this.memManager.release(join.getFreedMemory());
}
+
+ @Test
+ public void testHashWithBuildSideOuterJoin1() throws Exception {
+ final int NUM_KEYS = 20000;
+ final int BUILD_VALS_PER_KEY = 1;
+ final int PROBE_VALS_PER_KEY = 1;
+
+ // create a build input that gives 40000 pairs with 1 values
sharing the same key
+ MutableObjectIterator<IntPair> buildInput = new
UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false);
+
+ // create a probe input that gives 20000 pairs with 1 values
sharing a key
+ MutableObjectIterator<IntPair> probeInput = new
UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
+
+ // allocate the memory for the HashTable
+ List<MemorySegment> memSegments;
+ try {
+ // 33 is minimum number of pages required to perform
hash join this inputs
+ memSegments = this.memManager.allocatePages(MEM_OWNER,
33);
+ }
+ catch (MemoryAllocationException maex) {
+ fail("Memory for the Join could not be provided.");
+ return;
+ }
+
+ //
----------------------------------------------------------------------------------------
+
+ final MutableHashTable<IntPair, IntPair> join = new
MutableHashTable<IntPair, IntPair>(
+ this.pairBuildSideAccesssor,
this.pairProbeSideAccesssor,
+ this.pairBuildSideComparator,
this.pairProbeSideComparator, this.pairComparator,
+ memSegments, ioManager);
+ join.open(buildInput, probeInput, true);
+
+ final IntPair recordReuse = new IntPair();
+ int numRecordsInJoinResult = 0;
+
+ while (join.nextRecord()) {
+ MutableObjectIterator<IntPair> buildSide =
join.getBuildSideIterator();
+ while (buildSide.next(recordReuse) != null) {
+ numRecordsInJoinResult++;
+ }
+ }
+ Assert.assertEquals("Wrong number of records in join result.",
2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
+
+ join.close();
+ this.memManager.release(join.getFreedMemory());
+ }
+
+ @Test
+ public void testHashWithBuildSideOuterJoin2() throws Exception {
+ final int NUM_KEYS = 40000;
+ final int BUILD_VALS_PER_KEY = 2;
+ final int PROBE_VALS_PER_KEY = 1;
+
+ // The keys of probe and build sides are overlapped, so there
would be none unmatched build elements
+ // after probe phase.
+
+ // create a build input that gives 40000 pairs with 2 values
sharing the same key
+ MutableObjectIterator<IntPair> buildInput = new
UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+
+ // create a probe input that gives 20000 pairs with 1 values
sharing a key
+ MutableObjectIterator<IntPair> probeInput = new
UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
+
+ // allocate the memory for the HashTable
+ List<MemorySegment> memSegments;
+ try {
+ // 33 is minimum number of pages required to perform
hash join this inputs
+ memSegments = this.memManager.allocatePages(MEM_OWNER,
33);
+ }
+ catch (MemoryAllocationException maex) {
+ fail("Memory for the Join could not be provided.");
+ return;
+ }
+
+ //
----------------------------------------------------------------------------------------
+
+ final MutableHashTable<IntPair, IntPair> join = new
MutableHashTable<IntPair, IntPair>(
+ this.pairBuildSideAccesssor,
this.pairProbeSideAccesssor,
+ this.pairBuildSideComparator,
this.pairProbeSideComparator, this.pairComparator,
+ memSegments, ioManager);
+ join.open(buildInput, probeInput, true);
+
+ final IntPair recordReuse = new IntPair();
+ int numRecordsInJoinResult = 0;
+
+ while (join.nextRecord()) {
+ MutableObjectIterator<IntPair> buildSide =
join.getBuildSideIterator();
+ IntPair next = buildSide.next(recordReuse);
--- End diff --
OK, maybe add a brief comment to make clear which behavior is tested.
> Add OuterJoin strategy with HashTable on outer side
> ---------------------------------------------------
>
> Key: FLINK-2871
> URL: https://issues.apache.org/jira/browse/FLINK-2871
> Project: Flink
> Issue Type: New Feature
> Components: Local Runtime, Optimizer
> Affects Versions: 0.10.0
> Reporter: Fabian Hueske
> Assignee: Chengxiang Li
> Priority: Minor
>
> Outer joins are currently supported with two local execution strategies:
> - sort-merge join
> - hash join where the hash table is built on the inner side. Hence, this
> strategy is only supported for left and right outer joins.
> In order to support hash-tables on the outer side, we need a special hash
> table implementation that gives access to all records which have not been
> accessed during the probe phase.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)