Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49656473
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
 ---
    @@ -1558,7 +1641,209 @@ public void reset() {
                }
     
        } // end HashBucketIterator
    +   
    +   /**
    +    * Iterate all the elements in memory which has not been probed during 
probe phase.
    +    */
    +   public static class UnmatchedBuildIterator<BT, PT> implements 
MutableObjectIterator<BT> {
    +   
    +           private final TypeSerializer<BT> accessor;
    +   
    +           private final long totalBucketNumber;
    +           
    +           private final int bucketsPerSegmentBits;
    +           
    +           private final int bucketsPerSegmentMask;
    +           
    +           private final MemorySegment[] buckets;
    +           
    +           private final ArrayList<HashPartition<BT, PT>> 
partitionsBeingBuilt;
    +           
    +           private final BitSet probedSet;
    +           
    +           private MemorySegment bucket;
    +           
    +           private MemorySegment[] overflowSegments;
    +           
    +           private HashPartition<BT, PT> partition;
    +           
    +           private int scanCount;
    +           
    +           private int bucketInSegmentOffset;
    +           
    +           private int countInSegment;
    +           
    +           private int numInSegment;
    +           
    +           UnmatchedBuildIterator(
    +                   TypeSerializer<BT> accessor,
    +                   long totalBucketNumber,
    +                   int bucketsPerSegmentBits,
    +                   int bucketsPerSegmentMask,
    +                   MemorySegment[] buckets,
    +                   ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +                   BitSet probedSet) {
    +                   
    +                   this.accessor = accessor;
    +                   this.totalBucketNumber = totalBucketNumber;
    +                   this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +                   this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +                   this.buckets = buckets;
    +                   this.partitionsBeingBuilt = partitionsBeingBuilt;
    +                   this.probedSet = probedSet;
    +                   init();
    +           }
    +           
    +           private void init() {
    +                   scanCount = -1;
    +                   while (!moveToNextBucket()) {
    +                           if (scanCount >= totalBucketNumber) {
    +                                   break;
    +                           }
    +                   }
    +           }
    +           
    +           public BT next(BT reuse) {
    +                   while (true) {
    +                           BT result = nextInBucket(reuse);
    +                           if (result == null) {
    +                                   while (!moveToNextBucket()) {
    +                                           if (scanCount >= 
totalBucketNumber) {
    +                                                   return null;
    +                                           }
    +                                   }
    +                           } else {
    +                                   return result;
    +                           }
    +                   }
    +           }
    +           
    +           public BT next() {
    +                   while (true) {
    +                           BT result = nextInBucket();
    +                           if (result == null) {
    +                                   while (!moveToNextBucket()) {
    +                                           if (scanCount >= 
totalBucketNumber) {
    +                                                   return null;
    +                                           }
    +                                   }
    +                           } else {
    +                                   return result;
    +                           }
    +                   }
    +           }
    +   
    +           private boolean moveToNextBucket() {
    +                   scanCount++;
    +                   if (scanCount > totalBucketNumber - 1) {
    +                           return false;
    +                   }
    +                   final int bucketArrayPos = scanCount >> 
this.bucketsPerSegmentBits;
    +                   final int currentBucketInSegmentOffset = (scanCount & 
this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
    +                   MemorySegment currentBucket = 
this.buckets[bucketArrayPos];
    +                   final int partitionNumber = 
currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET);
    +                   final HashPartition<BT, PT> p = 
this.partitionsBeingBuilt.get(partitionNumber);
    +                   if (p.isInMemory()) {
    +                           set(currentBucket, p.overflowSegments, p, 
currentBucketInSegmentOffset);
    +                           return true;
    +                   } else {
    +                           return false;
    +                   }
    +           }
    +   
    +           private void set(MemorySegment bucket, MemorySegment[] 
overflowSegments, HashPartition<BT, PT> partition,
    +                   int bucketInSegmentOffset) {
    +                   this.bucket = bucket;
    +                   this.overflowSegments = overflowSegments;
    +                   this.partition = partition;
    +                   this.bucketInSegmentOffset = bucketInSegmentOffset;
    +                   this.countInSegment = 
bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +                   this.numInSegment = 0;
    +           }
    +   
    +           public BT nextInBucket(BT reuse) {
    --- End diff --
    
    Change visibility to `private`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to