Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1469#discussion_r49657059
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
---
@@ -1558,7 +1641,209 @@ public void reset() {
}
} // end HashBucketIterator
+
+ /**
+ * Iterate all the elements in memory which has not been probed during
probe phase.
+ */
+ public static class UnmatchedBuildIterator<BT, PT> implements
MutableObjectIterator<BT> {
+
+ private final TypeSerializer<BT> accessor;
+
+ private final long totalBucketNumber;
+
+ private final int bucketsPerSegmentBits;
+
+ private final int bucketsPerSegmentMask;
+
+ private final MemorySegment[] buckets;
+
+ private final ArrayList<HashPartition<BT, PT>>
partitionsBeingBuilt;
+
+ private final BitSet probedSet;
+
+ private MemorySegment bucket;
+
+ private MemorySegment[] overflowSegments;
+
+ private HashPartition<BT, PT> partition;
+
+ private int scanCount;
+
+ private int bucketInSegmentOffset;
+
+ private int countInSegment;
+
+ private int numInSegment;
+
+ UnmatchedBuildIterator(
+ TypeSerializer<BT> accessor,
+ long totalBucketNumber,
+ int bucketsPerSegmentBits,
+ int bucketsPerSegmentMask,
+ MemorySegment[] buckets,
+ ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
+ BitSet probedSet) {
+
+ this.accessor = accessor;
+ this.totalBucketNumber = totalBucketNumber;
+ this.bucketsPerSegmentBits = bucketsPerSegmentBits;
+ this.bucketsPerSegmentMask = bucketsPerSegmentMask;
+ this.buckets = buckets;
+ this.partitionsBeingBuilt = partitionsBeingBuilt;
+ this.probedSet = probedSet;
+ init();
+ }
+
+ private void init() {
+ scanCount = -1;
+ while (!moveToNextBucket()) {
+ if (scanCount >= totalBucketNumber) {
+ break;
+ }
+ }
+ }
+
+ public BT next(BT reuse) {
+ while (true) {
+ BT result = nextInBucket(reuse);
+ if (result == null) {
+ while (!moveToNextBucket()) {
+ if (scanCount >=
totalBucketNumber) {
+ return null;
+ }
+ }
+ } else {
+ return result;
+ }
+ }
+ }
+
+ public BT next() {
+ while (true) {
+ BT result = nextInBucket();
+ if (result == null) {
+ while (!moveToNextBucket()) {
+ if (scanCount >=
totalBucketNumber) {
+ return null;
+ }
+ }
+ } else {
+ return result;
+ }
+ }
+ }
+
+ private boolean moveToNextBucket() {
+ scanCount++;
+ if (scanCount > totalBucketNumber - 1) {
+ return false;
+ }
+ final int bucketArrayPos = scanCount >>
this.bucketsPerSegmentBits;
+ final int currentBucketInSegmentOffset = (scanCount &
this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
+ MemorySegment currentBucket =
this.buckets[bucketArrayPos];
+ final int partitionNumber =
currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET);
+ final HashPartition<BT, PT> p =
this.partitionsBeingBuilt.get(partitionNumber);
+ if (p.isInMemory()) {
+ set(currentBucket, p.overflowSegments, p,
currentBucketInSegmentOffset);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void set(MemorySegment bucket, MemorySegment[]
overflowSegments, HashPartition<BT, PT> partition,
+ int bucketInSegmentOffset) {
+ this.bucket = bucket;
+ this.overflowSegments = overflowSegments;
+ this.partition = partition;
+ this.bucketInSegmentOffset = bucketInSegmentOffset;
+ this.countInSegment =
bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
+ this.numInSegment = 0;
+ }
+
+ public BT nextInBucket(BT reuse) {
+ // loop over all segments that are involved in the
bucket (original bucket plus overflow buckets)
+ while (true) {
+ probedSet.setMemorySegment(bucket,
this.bucketInSegmentOffset + HEADER_BITSET_OFFSET);
--- End diff --
This needs only to be done when we move to the next bucket. Not for every
record we read.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---