Repository: flink Updated Branches: refs/heads/master bbc5e29c8 -> 336b95d4e
[FLINK-3722] [runtime] Don't / and % when sorting Replace division and modulus with addition and subtraction. This closes #2628 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/336b95d4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/336b95d4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/336b95d4 Branch: refs/heads/master Commit: 336b95d4eedc23e5ce37d1739165157e127c65f8 Parents: bbc5e29 Author: Greg Hogan <[email protected]> Authored: Wed Oct 5 16:13:02 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Wed Apr 26 11:44:48 2017 -0400 ---------------------------------------------------------------------- .../operators/sort/FixedLengthRecordSorter.java | 59 +++++--- .../runtime/operators/sort/IndexedSortable.java | 38 +++++ .../operators/sort/NormalizedKeySorter.java | 66 ++++++--- .../flink/runtime/operators/sort/QuickSort.java | 147 ++++++++++++++----- 4 files changed, 234 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java index 3a44ab5..22dfd29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java @@ -19,11 +19,6 @@ package org.apache.flink.runtime.operators.sort; -import java.io.EOFException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.MemorySegment; @@ -32,6 +27,11 @@ import org.apache.flink.runtime.memory.AbstractPagedInputView; import org.apache.flink.runtime.memory.AbstractPagedOutputView; import org.apache.flink.util.MutableObjectIterator; +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + /** * */ @@ -134,6 +134,17 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> { this.recordInstance = this.serializer.createInstance(); } + @Override + public int recordSize() { + return recordSize; + } + + @Override + public int recordsPerSegment() { + return recordsPerSegment; + } + + // ------------------------------------------------------------------------- // Memory Segment // ------------------------------------------------------------------------- @@ -254,30 +265,40 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> { @Override public int compare(int i, int j) { - final int bufferNumI = i / this.recordsPerSegment; + final int segmentNumberI = i / this.recordsPerSegment; final int segmentOffsetI = (i % this.recordsPerSegment) * this.recordSize; - - final int bufferNumJ = j / this.recordsPerSegment; + + final int segmentNumberJ = j / this.recordsPerSegment; final int segmentOffsetJ = (j % this.recordsPerSegment) * this.recordSize; - - final MemorySegment segI = this.sortBuffer.get(bufferNumI); - final MemorySegment segJ = this.sortBuffer.get(bufferNumJ); - + + return compare(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ); + } + + @Override + public int compare(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) { + final MemorySegment segI = this.sortBuffer.get(segmentNumberI); + final MemorySegment segJ = this.sortBuffer.get(segmentNumberJ); + int val = segI.compare(segJ, segmentOffsetI, segmentOffsetJ, this.numKeyBytes); return this.useNormKeyUninverted ? val : -val; } @Override public void swap(int i, int j) { - final int bufferNumI = i / this.recordsPerSegment; + final int segmentNumberI = i / this.recordsPerSegment; final int segmentOffsetI = (i % this.recordsPerSegment) * this.recordSize; - - final int bufferNumJ = j / this.recordsPerSegment; + + final int segmentNumberJ = j / this.recordsPerSegment; final int segmentOffsetJ = (j % this.recordsPerSegment) * this.recordSize; - - final MemorySegment segI = this.sortBuffer.get(bufferNumI); - final MemorySegment segJ = this.sortBuffer.get(bufferNumJ); - + + swap(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ); + } + + @Override + public void swap(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) { + final MemorySegment segI = this.sortBuffer.get(segmentNumberI); + final MemorySegment segJ = this.sortBuffer.get(segmentNumberJ); + segI.swapBytes(this.swapBuffer, segJ, segmentOffsetI, segmentOffsetJ, this.recordSize); } http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java index 6d7d499..c09cf03 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java @@ -30,15 +30,53 @@ public interface IndexedSortable { int compare(int i, int j); /** + * Compare records at the given addresses consistent with the semantics of + * {@link java.util.Comparator#compare(Object, Object)}. + + * @param segmentNumberI index of memory segment containing first record + * @param segmentOffsetI offset into memory segment containing first record + * @param segmentNumberJ index of memory segment containing second record + * @param segmentOffsetJ offset into memory segment containing second record + * @return a negative integer, zero, or a positive integer as the + * first argument is less than, equal to, or greater than the + * second. + */ + int compare(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ); + + /** * Swap items at the given addresses. */ void swap(int i, int j); /** + * Swap records at the given addresses. + * + * @param segmentNumberI index of memory segment containing first record + * @param segmentOffsetI offset into memory segment containing first record + * @param segmentNumberJ index of memory segment containing second record + * @param segmentOffsetJ offset into memory segment containing second record + */ + void swap(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ); + + /** * Gets the number of elements in the sortable. * * @return The number of elements. */ int size(); + /** + * Gets the size of each record, the number of bytes separating the head + * of successive records. + * + * @return The record size + */ + int recordSize(); + + /** + * Gets the number of elements in each memory segment. + * + * @return The number of records per segment + */ + int recordsPerSegment(); } http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java index 2cade8d..0fd6f38 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java @@ -18,11 +18,6 @@ package org.apache.flink.runtime.operators.sort; -import java.io.EOFException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.MemorySegment; @@ -34,6 +29,11 @@ import org.apache.flink.util.MutableObjectIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + /** * */ @@ -167,7 +167,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { // compute the index entry size and limits this.indexEntrySize = this.numKeyBytes + OFFSET_LEN; - this.indexEntriesPerSegment = segmentSize / this.indexEntrySize; + this.indexEntriesPerSegment = this.segmentSize / this.indexEntrySize; this.lastIndexEntryOffset = (this.indexEntriesPerSegment - 1) * this.indexEntrySize; this.swapBuffer = new byte[this.indexEntrySize]; @@ -176,6 +176,16 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { this.sortIndex.add(this.currentSortIndexSegment); } + @Override + public int recordSize() { + return indexEntrySize; + } + + @Override + public int recordsPerSegment() { + return indexEntriesPerSegment; + } + // ------------------------------------------------------------------------- // Memory Segment // ------------------------------------------------------------------------- @@ -345,38 +355,48 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { @Override public int compare(int i, int j) { - final int bufferNumI = i / this.indexEntriesPerSegment; + final int segmentNumberI = i / this.indexEntriesPerSegment; final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize; - - final int bufferNumJ = j / this.indexEntriesPerSegment; + + final int segmentNumberJ = j / this.indexEntriesPerSegment; final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize; - - final MemorySegment segI = this.sortIndex.get(bufferNumI); - final MemorySegment segJ = this.sortIndex.get(bufferNumJ); - + + return compare(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ); + } + + @Override + public int compare(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) { + final MemorySegment segI = this.sortIndex.get(segmentNumberI); + final MemorySegment segJ = this.sortIndex.get(segmentNumberJ); + int val = segI.compare(segJ, segmentOffsetI + OFFSET_LEN, segmentOffsetJ + OFFSET_LEN, this.numKeyBytes); - + if (val != 0 || this.normalizedKeyFullyDetermines) { return this.useNormKeyUninverted ? val : -val; } - + final long pointerI = segI.getLong(segmentOffsetI) & POINTER_MASK; final long pointerJ = segJ.getLong(segmentOffsetJ) & POINTER_MASK; - + return compareRecords(pointerI, pointerJ); } @Override public void swap(int i, int j) { - final int bufferNumI = i / this.indexEntriesPerSegment; + final int segmentNumberI = i / this.indexEntriesPerSegment; final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize; - - final int bufferNumJ = j / this.indexEntriesPerSegment; + + final int segmentNumberJ = j / this.indexEntriesPerSegment; final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize; - - final MemorySegment segI = this.sortIndex.get(bufferNumI); - final MemorySegment segJ = this.sortIndex.get(bufferNumJ); - + + swap(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ); + } + + @Override + public void swap(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) { + final MemorySegment segI = this.sortIndex.get(segmentNumberI); + final MemorySegment segJ = this.sortIndex.get(segmentNumberJ); + segI.swapBytes(this.swapBuffer, segJ, segmentOffsetI, segmentOffsetJ, this.indexEntrySize); } http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java index 474644e..80ba6d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java @@ -26,9 +26,19 @@ public final class QuickSort implements IndexedSorter { public QuickSort() { } - private static void fix(IndexedSortable s, int p, int r) { - if (s.compare(p, r) > 0) { - s.swap(p, r); + /** + * Fix the records into sorted order, swapping when the first record is + * greater than the second record. + * + * @param s paged sortable + * @param pN page number of first record + * @param pO page offset of first record + * @param rN page number of second record + * @param rO page offset of second record + */ + private static void fix(IndexedSortable s, int pN, int pO, int rN, int rO) { + if (s.compare(pN, pO, rN, rO) > 0) { + s.swap(pN, pO, rN, rO); } } @@ -45,85 +55,154 @@ public final class QuickSort implements IndexedSorter { /** * Sort the given range of items using quick sort. {@inheritDoc} If the recursion depth falls below - * {@link #getMaxDepth}, - * then switch to {@link HeapSort}. + * {@link #getMaxDepth}, then switch to {@link HeapSort}. */ public void sort(final IndexedSortable s, int p, int r) { - sortInternal(s, p, r, getMaxDepth(r - p)); + int recordsPerSegment = s.recordsPerSegment(); + int recordSize = s.recordSize(); + int maxOffset = recordSize * (recordsPerSegment - 1); + + int pN = p / recordsPerSegment; + int pO = (p % recordsPerSegment) * recordSize; + + int rN = r / recordsPerSegment; + int rO = (r % recordsPerSegment) * recordSize; + + sortInternal(s, recordsPerSegment, recordSize, maxOffset, p, pN, pO, r, rN, rO, getMaxDepth(r - p)); } public void sort(IndexedSortable s) { sort(s, 0, s.size()); } - private static void sortInternal(final IndexedSortable s, int p, int r, int depth) { + /** + * Sort the given range of items using quick sort. If the recursion depth falls below + * {@link #getMaxDepth}, then switch to {@link HeapSort}. + * + * @param s paged sortable + * @param recordsPerSegment number of records per memory segment + * @param recordSize number of bytes per record + * @param maxOffset offset of a last record in a memory segment + * @param p index of first record in range + * @param pN page number of first record in range + * @param pO page offset of first record in range + * @param r index of last-plus-one'th record in range + * @param rN page number of last-plus-one'th record in range + * @param rO page offset of last-plus-one'th record in range + * @param depth recursion depth + * + * @see #sort(IndexedSortable, int, int) + */ + private static void sortInternal(final IndexedSortable s, int recordsPerSegment, int recordSize, int maxOffset, + int p, int pN, int pO, int r, int rN, int rO, int depth) { while (true) { if (r - p < 13) { - for (int i = p; i < r; ++i) { - for (int j = i; j > p && s.compare(j - 1, j) > 0; --j) { - s.swap(j, j - 1); + // switch to insertion sort + int i = p+1, iN, iO; if (pO == maxOffset) { iN = pN+1; iO = 0; } else { iN = pN; iO = pO+recordSize; } + + while (i < r) { + int j = i, jN = iN, jO = iO; + int jd = j-1, jdN, jdO; if (jO == 0) { jdN = jN-1; jdO = maxOffset; } else { jdN = jN; jdO = jO-recordSize; } + + while (j > p && s.compare(jdN, jdO, jN, jO) > 0) { + s.swap(jN, jO, jdN, jdO); + + j = jd; jN = jdN; jO = jdO; + jd--; if (jdO == 0) { jdN--; jdO = maxOffset; } else { jdO -= recordSize; } } + + i++; if (iO == maxOffset) { iN++; iO = 0; } else { iO += recordSize; } } return; } + if (--depth < 0) { - // give up + // switch to heap sort alt.sort(s, p, r); return; } + int rdN, rdO; if (rO == 0) { rdN = rN-1; rdO = maxOffset; } else { rdN = rN; rdO = rO-recordSize; } + int m = (p+r)>>>1, mN = m / recordsPerSegment, mO = (m % recordsPerSegment) * recordSize; + // select, move pivot into first position - fix(s, (p + r) >>> 1, p); - fix(s, (p + r) >>> 1, r - 1); - fix(s, p, r - 1); + fix(s, mN, mO, pN, pO); + fix(s, mN, mO, rdN, rdO); + fix(s, pN, pO, rdN, rdO); // Divide - int i = p; - int j = r; - int ll = p; - int rr = r; + int i = p, iN = pN, iO = pO; + int j = r, jN = rN, jO = rO; + int ll = p, llN = pN, llO = pO; + int rr = r, rrN = rN, rrO = rO; int cr; while (true) { - while (++i < j) { - if ((cr = s.compare(i, p)) > 0) { + i++; if (iO == maxOffset) { iN++; iO = 0; } else { iO += recordSize; } + + while (i < j) { + if ((cr = s.compare(iN, iO, pN, pO)) > 0) { break; } - if (0 == cr && ++ll != i) { - s.swap(ll, i); + + if (0 == cr) { + ll++; if (llO == maxOffset) { llN++; llO = 0; } else { llO += recordSize; } + + if (ll != i) { + s.swap(llN, llO, iN, iO); + } } + + i++; if (iO == maxOffset) { iN++; iO = 0; } else { iO += recordSize; } } - while (--j > i) { - if ((cr = s.compare(p, j)) > 0) { + + j--; if (jO == 0) { jN--; jO = maxOffset; } else { jO -= recordSize; } + + while (j > i) { + if ((cr = s.compare(pN, pO, jN, jO)) > 0) { break; } - if (0 == cr && --rr != j) { - s.swap(rr, j); + + if (0 == cr) { + rr--; if (rrO == 0) { rrN--; rrO = maxOffset; } else { rrO -= recordSize; } + + if (rr != j) { + s.swap(rrN, rrO, jN, jO); + } } + + j--; if (jO == 0) { jN--; jO = maxOffset; } else { jO -= recordSize; } } if (i < j) { - s.swap(i, j); + s.swap(iN, iO, jN, jO); } else { break; } } - j = i; + j = i; jN = iN; jO = iO; // swap pivot- and all eq values- into position while (ll >= p) { - s.swap(ll--, --i); + i--; if (iO == 0) { iN--; iO = maxOffset; } else { iO -= recordSize; } + + s.swap(llN, llO, iN, iO); + + ll--; if (llO == 0) { llN--; llO = maxOffset; } else { llO -= recordSize; } } while (rr < r) { - s.swap(rr++, j++); + s.swap(rrN, rrO, jN, jO); + + rr++; if (rrO == maxOffset) { rrN++; rrO = 0; } else { rrO += recordSize; } + j++; if (jO == maxOffset) { jN++; jO = 0; } else { jO += recordSize; } } // Conquer // Recurse on smaller interval first to keep stack shallow assert i != j; if (i - p < r - j) { - sortInternal(s, p, i, depth); - p = j; + sortInternal(s, recordsPerSegment, recordSize, maxOffset, p, pN, pO, i, iN, iO, depth); + p = j; pN = jN; pO = jO; } else { - sortInternal(s, j, r, depth); - r = i; + sortInternal(s, recordsPerSegment, recordSize, maxOffset, j, jN, jO, r, rN, rO, depth); + r = i; rN = iN; rO = iO; } } }
