[
https://issues.apache.org/jira/browse/FLINK-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15097075#comment-15097075
]
ASF GitHub Bot commented on FLINK-2871:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1469#discussion_r49656863
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
---
@@ -1558,7 +1641,209 @@ public void reset() {
}
} // end HashBucketIterator
+
+ /**
+ * Iterate all the elements in memory which has not been probed during
probe phase.
+ */
+ public static class UnmatchedBuildIterator<BT, PT> implements
MutableObjectIterator<BT> {
+
+ private final TypeSerializer<BT> accessor;
+
+ private final long totalBucketNumber;
+
+ private final int bucketsPerSegmentBits;
+
+ private final int bucketsPerSegmentMask;
+
+ private final MemorySegment[] buckets;
+
+ private final ArrayList<HashPartition<BT, PT>>
partitionsBeingBuilt;
+
+ private final BitSet probedSet;
+
+ private MemorySegment bucket;
+
+ private MemorySegment[] overflowSegments;
+
+ private HashPartition<BT, PT> partition;
+
+ private int scanCount;
+
+ private int bucketInSegmentOffset;
+
+ private int countInSegment;
+
+ private int numInSegment;
+
+ UnmatchedBuildIterator(
+ TypeSerializer<BT> accessor,
+ long totalBucketNumber,
+ int bucketsPerSegmentBits,
+ int bucketsPerSegmentMask,
+ MemorySegment[] buckets,
+ ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
+ BitSet probedSet) {
+
+ this.accessor = accessor;
+ this.totalBucketNumber = totalBucketNumber;
+ this.bucketsPerSegmentBits = bucketsPerSegmentBits;
+ this.bucketsPerSegmentMask = bucketsPerSegmentMask;
+ this.buckets = buckets;
+ this.partitionsBeingBuilt = partitionsBeingBuilt;
+ this.probedSet = probedSet;
+ init();
+ }
+
+ private void init() {
+ scanCount = -1;
+ while (!moveToNextBucket()) {
+ if (scanCount >= totalBucketNumber) {
+ break;
+ }
+ }
+ }
+
+ public BT next(BT reuse) {
+ while (true) {
+ BT result = nextInBucket(reuse);
+ if (result == null) {
+ while (!moveToNextBucket()) {
+ if (scanCount >=
totalBucketNumber) {
+ return null;
+ }
+ }
+ } else {
+ return result;
+ }
+ }
+ }
+
+ public BT next() {
+ while (true) {
+ BT result = nextInBucket();
+ if (result == null) {
+ while (!moveToNextBucket()) {
+ if (scanCount >=
totalBucketNumber) {
+ return null;
+ }
+ }
+ } else {
+ return result;
+ }
+ }
+ }
+
+ private boolean moveToNextBucket() {
+ scanCount++;
+ if (scanCount > totalBucketNumber - 1) {
+ return false;
+ }
+ final int bucketArrayPos = scanCount >>
this.bucketsPerSegmentBits;
+ final int currentBucketInSegmentOffset = (scanCount &
this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
+ MemorySegment currentBucket =
this.buckets[bucketArrayPos];
+ final int partitionNumber =
currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET);
+ final HashPartition<BT, PT> p =
this.partitionsBeingBuilt.get(partitionNumber);
+ if (p.isInMemory()) {
+ set(currentBucket, p.overflowSegments, p,
currentBucketInSegmentOffset);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void set(MemorySegment bucket, MemorySegment[]
overflowSegments, HashPartition<BT, PT> partition,
--- End diff --
rename to `setBucket()`?
> Add OuterJoin strategy with HashTable on outer side
> ---------------------------------------------------
>
> Key: FLINK-2871
> URL: https://issues.apache.org/jira/browse/FLINK-2871
> Project: Flink
> Issue Type: New Feature
> Components: Local Runtime, Optimizer
> Affects Versions: 0.10.0
> Reporter: Fabian Hueske
> Assignee: Chengxiang Li
> Priority: Minor
>
> Outer joins are currently supported with two local execution strategies:
> - sort-merge join
> - hash join where the hash table is built on the inner side. Hence, this
> strategy is only supported for left and right outer joins.
> In order to support hash-tables on the outer side, we need a special hash
> table implementation that gives access to all records which have not been
> accessed during the probe phase.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)