[
https://issues.apache.org/jira/browse/FLINK-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15108291#comment-15108291
]
ASF GitHub Bot commented on FLINK-2871:
---------------------------------------
Github user ChengXiangLi commented on a diff in the pull request:
https://github.com/apache/flink/pull/1469#discussion_r50232198
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
---
@@ -1558,7 +1641,209 @@ public void reset() {
}
} // end HashBucketIterator
+
+ /**
+ * Iterate all the elements in memory which has not been probed during
probe phase.
+ */
+ public static class UnmatchedBuildIterator<BT, PT> implements
MutableObjectIterator<BT> {
+
+ private final TypeSerializer<BT> accessor;
+
+ private final long totalBucketNumber;
+
+ private final int bucketsPerSegmentBits;
+
+ private final int bucketsPerSegmentMask;
+
+ private final MemorySegment[] buckets;
+
+ private final ArrayList<HashPartition<BT, PT>>
partitionsBeingBuilt;
+
+ private final BitSet probedSet;
+
+ private MemorySegment bucket;
+
+ private MemorySegment[] overflowSegments;
+
+ private HashPartition<BT, PT> partition;
+
+ private int scanCount;
+
+ private int bucketInSegmentOffset;
+
+ private int countInSegment;
+
+ private int numInSegment;
+
+ UnmatchedBuildIterator(
+ TypeSerializer<BT> accessor,
+ long totalBucketNumber,
+ int bucketsPerSegmentBits,
+ int bucketsPerSegmentMask,
+ MemorySegment[] buckets,
+ ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
+ BitSet probedSet) {
+
+ this.accessor = accessor;
+ this.totalBucketNumber = totalBucketNumber;
+ this.bucketsPerSegmentBits = bucketsPerSegmentBits;
+ this.bucketsPerSegmentMask = bucketsPerSegmentMask;
+ this.buckets = buckets;
+ this.partitionsBeingBuilt = partitionsBeingBuilt;
+ this.probedSet = probedSet;
+ init();
+ }
+
+ private void init() {
+ scanCount = -1;
+ while (!moveToNextBucket()) {
+ if (scanCount >= totalBucketNumber) {
+ break;
+ }
+ }
+ }
+
+ public BT next(BT reuse) {
+ while (true) {
+ BT result = nextInBucket(reuse);
+ if (result == null) {
+ while (!moveToNextBucket()) {
+ if (scanCount >=
totalBucketNumber) {
+ return null;
+ }
+ }
+ } else {
+ return result;
+ }
+ }
+ }
+
+ public BT next() {
+ while (true) {
+ BT result = nextInBucket();
+ if (result == null) {
+ while (!moveToNextBucket()) {
+ if (scanCount >=
totalBucketNumber) {
+ return null;
+ }
+ }
+ } else {
+ return result;
+ }
+ }
+ }
+
+ private boolean moveToNextBucket() {
--- End diff --
Next bucket may be spilled on disk, so we need a loop here to make sure we
move to next on-heap bucket.
> Add OuterJoin strategy with HashTable on outer side
> ---------------------------------------------------
>
> Key: FLINK-2871
> URL: https://issues.apache.org/jira/browse/FLINK-2871
> Project: Flink
> Issue Type: New Feature
> Components: Local Runtime, Optimizer
> Affects Versions: 0.10.0
> Reporter: Fabian Hueske
> Assignee: Chengxiang Li
> Priority: Minor
>
> Outer joins are currently supported with two local execution strategies:
> - sort-merge join
> - hash join where the hash table is built on the inner side. Hence, this
> strategy is only supported for left and right outer joins.
> In order to support hash-tables on the outer side, we need a special hash
> table implementation that gives access to all records which have not been
> accessed during the probe phase.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)