Repository: flink
Updated Branches:
  refs/heads/master bbc5e29c8 -> 336b95d4e


[FLINK-3722] [runtime] Don't / and % when sorting

Replace division and modulus with addition and subtraction.

This closes #2628


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/336b95d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/336b95d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/336b95d4

Branch: refs/heads/master
Commit: 336b95d4eedc23e5ce37d1739165157e127c65f8
Parents: bbc5e29
Author: Greg Hogan <[email protected]>
Authored: Wed Oct 5 16:13:02 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Wed Apr 26 11:44:48 2017 -0400

----------------------------------------------------------------------
 .../operators/sort/FixedLengthRecordSorter.java |  59 +++++---
 .../runtime/operators/sort/IndexedSortable.java |  38 +++++
 .../operators/sort/NormalizedKeySorter.java     |  66 ++++++---
 .../flink/runtime/operators/sort/QuickSort.java | 147 ++++++++++++++-----
 4 files changed, 234 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index 3a44ab5..22dfd29 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -19,11 +19,6 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
@@ -32,6 +27,11 @@ import 
org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.util.MutableObjectIterator;
 
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * 
  */
@@ -134,6 +134,17 @@ public final class FixedLengthRecordSorter<T> implements 
InMemorySorter<T> {
                this.recordInstance = this.serializer.createInstance();
        }
 
+       @Override
+       public int recordSize() {
+               return recordSize;
+       }
+
+       @Override
+       public int recordsPerSegment() {
+               return recordsPerSegment;
+       }
+
+
        // 
-------------------------------------------------------------------------
        // Memory Segment
        // 
-------------------------------------------------------------------------
@@ -254,30 +265,40 @@ public final class FixedLengthRecordSorter<T> implements 
InMemorySorter<T> {
 
        @Override
        public int compare(int i, int j) {
-               final int bufferNumI = i / this.recordsPerSegment;
+               final int segmentNumberI = i / this.recordsPerSegment;
                final int segmentOffsetI = (i % this.recordsPerSegment) * 
this.recordSize;
-               
-               final int bufferNumJ = j / this.recordsPerSegment;
+
+               final int segmentNumberJ = j / this.recordsPerSegment;
                final int segmentOffsetJ = (j % this.recordsPerSegment) * 
this.recordSize;
-               
-               final MemorySegment segI = this.sortBuffer.get(bufferNumI);
-               final MemorySegment segJ = this.sortBuffer.get(bufferNumJ);
-               
+
+               return compare(segmentNumberI, segmentOffsetI, segmentNumberJ, 
segmentOffsetJ);
+       }
+
+       @Override
+       public int compare(int segmentNumberI, int segmentOffsetI, int 
segmentNumberJ, int segmentOffsetJ) {
+               final MemorySegment segI = this.sortBuffer.get(segmentNumberI);
+               final MemorySegment segJ = this.sortBuffer.get(segmentNumberJ);
+
                int val = segI.compare(segJ, segmentOffsetI, segmentOffsetJ, 
this.numKeyBytes);
                return this.useNormKeyUninverted ? val : -val;
        }
 
        @Override
        public void swap(int i, int j) {
-               final int bufferNumI = i / this.recordsPerSegment;
+               final int segmentNumberI = i / this.recordsPerSegment;
                final int segmentOffsetI = (i % this.recordsPerSegment) * 
this.recordSize;
-               
-               final int bufferNumJ = j / this.recordsPerSegment;
+
+               final int segmentNumberJ = j / this.recordsPerSegment;
                final int segmentOffsetJ = (j % this.recordsPerSegment) * 
this.recordSize;
-               
-               final MemorySegment segI = this.sortBuffer.get(bufferNumI);
-               final MemorySegment segJ = this.sortBuffer.get(bufferNumJ);
-               
+
+               swap(segmentNumberI, segmentOffsetI, segmentNumberJ, 
segmentOffsetJ);
+       }
+
+       @Override
+       public void swap(int segmentNumberI, int segmentOffsetI, int 
segmentNumberJ, int segmentOffsetJ) {
+               final MemorySegment segI = this.sortBuffer.get(segmentNumberI);
+               final MemorySegment segJ = this.sortBuffer.get(segmentNumberJ);
+
                segI.swapBytes(this.swapBuffer, segJ, segmentOffsetI, 
segmentOffsetJ, this.recordSize);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java
index 6d7d499..c09cf03 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/IndexedSortable.java
@@ -30,15 +30,53 @@ public interface IndexedSortable {
        int compare(int i, int j);
 
        /**
+        * Compare records at the given addresses consistent with the semantics 
of
+        * {@link java.util.Comparator#compare(Object, Object)}.
+
+        * @param segmentNumberI index of memory segment containing first record
+        * @param segmentOffsetI offset into memory segment containing first 
record
+        * @param segmentNumberJ index of memory segment containing second 
record
+        * @param segmentOffsetJ offset into memory segment containing second 
record
+        * @return a negative integer, zero, or a positive integer as the
+        *         first argument is less than, equal to, or greater than the
+        *         second.
+        */
+       int compare(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, 
int segmentOffsetJ);
+
+       /**
         * Swap items at the given addresses.
         */
        void swap(int i, int j);
 
        /**
+        * Swap records at the given addresses.
+        *
+        * @param segmentNumberI index of memory segment containing first record
+        * @param segmentOffsetI offset into memory segment containing first 
record
+        * @param segmentNumberJ index of memory segment containing second 
record
+        * @param segmentOffsetJ offset into memory segment containing second 
record
+        */
+       void swap(int segmentNumberI, int segmentOffsetI, int segmentNumberJ, 
int segmentOffsetJ);
+
+       /**
         * Gets the number of elements in the sortable.
         * 
         * @return The number of elements.
         */
        int size();
 
+       /**
+        * Gets the size of each record, the number of bytes separating the head
+        * of successive records.
+        *
+        * @return The record size
+        */
+       int recordSize();
+
+       /**
+        * Gets the number of elements in each memory segment.
+        *
+        * @return The number of records per segment
+        */
+       int recordsPerSegment();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index 2cade8d..0fd6f38 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
@@ -34,6 +29,11 @@ import org.apache.flink.util.MutableObjectIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * 
  */
@@ -167,7 +167,7 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                
                // compute the index entry size and limits
                this.indexEntrySize = this.numKeyBytes + OFFSET_LEN;
-               this.indexEntriesPerSegment = segmentSize / this.indexEntrySize;
+               this.indexEntriesPerSegment = this.segmentSize / 
this.indexEntrySize;
                this.lastIndexEntryOffset = (this.indexEntriesPerSegment - 1) * 
this.indexEntrySize;
                this.swapBuffer = new byte[this.indexEntrySize];
                
@@ -176,6 +176,16 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                this.sortIndex.add(this.currentSortIndexSegment);
        }
 
+       @Override
+       public int recordSize() {
+               return indexEntrySize;
+       }
+
+       @Override
+       public int recordsPerSegment() {
+               return indexEntriesPerSegment;
+       }
+
        // 
-------------------------------------------------------------------------
        // Memory Segment
        // 
-------------------------------------------------------------------------
@@ -345,38 +355,48 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
 
        @Override
        public int compare(int i, int j) {
-               final int bufferNumI = i / this.indexEntriesPerSegment;
+               final int segmentNumberI = i / this.indexEntriesPerSegment;
                final int segmentOffsetI = (i % this.indexEntriesPerSegment) * 
this.indexEntrySize;
-               
-               final int bufferNumJ = j / this.indexEntriesPerSegment;
+
+               final int segmentNumberJ = j / this.indexEntriesPerSegment;
                final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * 
this.indexEntrySize;
-               
-               final MemorySegment segI = this.sortIndex.get(bufferNumI);
-               final MemorySegment segJ = this.sortIndex.get(bufferNumJ);
-               
+
+               return compare(segmentNumberI, segmentOffsetI, segmentNumberJ, 
segmentOffsetJ);
+       }
+
+       @Override
+       public int compare(int segmentNumberI, int segmentOffsetI, int 
segmentNumberJ, int segmentOffsetJ) {
+               final MemorySegment segI = this.sortIndex.get(segmentNumberI);
+               final MemorySegment segJ = this.sortIndex.get(segmentNumberJ);
+
                int val = segI.compare(segJ, segmentOffsetI + OFFSET_LEN, 
segmentOffsetJ + OFFSET_LEN, this.numKeyBytes);
-               
+
                if (val != 0 || this.normalizedKeyFullyDetermines) {
                        return this.useNormKeyUninverted ? val : -val;
                }
-               
+
                final long pointerI = segI.getLong(segmentOffsetI) & 
POINTER_MASK;
                final long pointerJ = segJ.getLong(segmentOffsetJ) & 
POINTER_MASK;
-               
+
                return compareRecords(pointerI, pointerJ);
        }
 
        @Override
        public void swap(int i, int j) {
-               final int bufferNumI = i / this.indexEntriesPerSegment;
+               final int segmentNumberI = i / this.indexEntriesPerSegment;
                final int segmentOffsetI = (i % this.indexEntriesPerSegment) * 
this.indexEntrySize;
-               
-               final int bufferNumJ = j / this.indexEntriesPerSegment;
+
+               final int segmentNumberJ = j / this.indexEntriesPerSegment;
                final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * 
this.indexEntrySize;
-               
-               final MemorySegment segI = this.sortIndex.get(bufferNumI);
-               final MemorySegment segJ = this.sortIndex.get(bufferNumJ);
-               
+
+               swap(segmentNumberI, segmentOffsetI, segmentNumberJ, 
segmentOffsetJ);
+       }
+
+       @Override
+       public void swap(int segmentNumberI, int segmentOffsetI, int 
segmentNumberJ, int segmentOffsetJ) {
+               final MemorySegment segI = this.sortIndex.get(segmentNumberI);
+               final MemorySegment segJ = this.sortIndex.get(segmentNumberJ);
+
                segI.swapBytes(this.swapBuffer, segJ, segmentOffsetI, 
segmentOffsetJ, this.indexEntrySize);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/336b95d4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
index 474644e..80ba6d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
@@ -26,9 +26,19 @@ public final class QuickSort implements IndexedSorter {
        public QuickSort() {
        }
 
-       private static void fix(IndexedSortable s, int p, int r) {
-               if (s.compare(p, r) > 0) {
-                       s.swap(p, r);
+       /**
+        * Fix the records into sorted order, swapping when the first record is
+        * greater than the second record.
+        *
+        * @param s paged sortable
+        * @param pN page number of first record
+        * @param pO page offset of first record
+        * @param rN page number of second record
+        * @param rO page offset of second record
+        */
+       private static void fix(IndexedSortable s, int pN, int pO, int rN, int 
rO) {
+               if (s.compare(pN, pO, rN, rO) > 0) {
+                       s.swap(pN, pO, rN, rO);
                }
        }
 
@@ -45,85 +55,154 @@ public final class QuickSort implements IndexedSorter {
 
        /**
         * Sort the given range of items using quick sort. {@inheritDoc} If the 
recursion depth falls below
-        * {@link #getMaxDepth},
-        * then switch to {@link HeapSort}.
+        * {@link #getMaxDepth}, then switch to {@link HeapSort}.
         */
        public void sort(final IndexedSortable s, int p, int r) {
-               sortInternal(s, p, r, getMaxDepth(r - p));
+               int recordsPerSegment = s.recordsPerSegment();
+               int recordSize = s.recordSize();
+               int maxOffset = recordSize * (recordsPerSegment - 1);
+
+               int pN = p / recordsPerSegment;
+               int pO = (p % recordsPerSegment) * recordSize;
+
+               int rN = r / recordsPerSegment;
+               int rO = (r % recordsPerSegment) * recordSize;
+
+               sortInternal(s, recordsPerSegment, recordSize, maxOffset, p, 
pN, pO, r, rN, rO, getMaxDepth(r - p));
        }
 
        public void sort(IndexedSortable s) {
                sort(s, 0, s.size());
        }
 
-       private static void sortInternal(final IndexedSortable s, int p, int r, 
int depth) {
+       /**
+        * Sort the given range of items using quick sort. If the recursion 
depth falls below
+        * {@link #getMaxDepth}, then switch to {@link HeapSort}.
+        *
+        * @param s paged sortable
+        * @param recordsPerSegment number of records per memory segment
+        * @param recordSize number of bytes per record
+        * @param maxOffset offset of a last record in a memory segment
+        * @param p index of first record in range
+        * @param pN page number of first record in range
+        * @param pO page offset of first record in range
+        * @param r index of last-plus-one'th record in range
+        * @param rN page number of last-plus-one'th record in range
+        * @param rO page offset of last-plus-one'th record in range
+        * @param depth recursion depth
+        *
+        * @see #sort(IndexedSortable, int, int)
+        */
+       private static void sortInternal(final IndexedSortable s, int 
recordsPerSegment, int recordSize, int maxOffset,
+                       int p, int pN, int pO, int r, int rN, int rO, int 
depth) {
                while (true) {
                        if (r - p < 13) {
-                               for (int i = p; i < r; ++i) {
-                                       for (int j = i; j > p && s.compare(j - 
1, j) > 0; --j) {
-                                               s.swap(j, j - 1);
+                               // switch to insertion sort
+                               int i = p+1, iN, iO; if (pO == maxOffset) { iN 
= pN+1; iO = 0; } else { iN = pN; iO = pO+recordSize; }
+
+                               while (i < r) {
+                                       int j = i, jN = iN, jO = iO;
+                                       int jd = j-1, jdN, jdO; if (jO == 0) { 
jdN = jN-1; jdO = maxOffset; } else { jdN = jN; jdO = jO-recordSize; }
+
+                                       while (j > p && s.compare(jdN, jdO, jN, 
jO) > 0) {
+                                               s.swap(jN, jO, jdN, jdO);
+
+                                               j = jd; jN = jdN; jO = jdO;
+                                               jd--; if (jdO == 0) { jdN--; 
jdO = maxOffset; } else { jdO -= recordSize; }
                                        }
+
+                                       i++; if (iO == maxOffset) { iN++; iO = 
0; } else { iO += recordSize; }
                                }
                                return;
                        }
+
                        if (--depth < 0) {
-                               // give up
+                               // switch to heap sort
                                alt.sort(s, p, r);
                                return;
                        }
 
+                       int rdN, rdO; if (rO == 0) { rdN = rN-1; rdO = 
maxOffset; } else { rdN = rN; rdO = rO-recordSize; }
+                       int m = (p+r)>>>1, mN = m / recordsPerSegment, mO = (m 
% recordsPerSegment) * recordSize;
+
                        // select, move pivot into first position
-                       fix(s, (p + r) >>> 1, p);
-                       fix(s, (p + r) >>> 1, r - 1);
-                       fix(s, p, r - 1);
+                       fix(s, mN, mO, pN, pO);
+                       fix(s, mN, mO, rdN, rdO);
+                       fix(s, pN, pO, rdN, rdO);
 
                        // Divide
-                       int i = p;
-                       int j = r;
-                       int ll = p;
-                       int rr = r;
+                       int i = p, iN = pN, iO = pO;
+                       int j = r, jN = rN, jO = rO;
+                       int ll = p, llN = pN, llO = pO;
+                       int rr = r, rrN = rN, rrO = rO;
                        int cr;
                        while (true) {
-                               while (++i < j) {
-                                       if ((cr = s.compare(i, p)) > 0) {
+                               i++; if (iO == maxOffset) { iN++; iO = 0; } 
else { iO += recordSize; }
+
+                               while (i < j) {
+                                       if ((cr = s.compare(iN, iO, pN, pO)) > 
0) {
                                                break;
                                        }
-                                       if (0 == cr && ++ll != i) {
-                                               s.swap(ll, i);
+
+                                       if (0 == cr) {
+                                               ll++; if (llO == maxOffset) { 
llN++; llO = 0; } else { llO += recordSize; }
+
+                                               if (ll != i) {
+                                                       s.swap(llN, llO, iN, 
iO);
+                                               }
                                        }
+
+                                       i++; if (iO == maxOffset) { iN++; iO = 
0; } else { iO += recordSize; }
                                }
-                               while (--j > i) {
-                                       if ((cr = s.compare(p, j)) > 0) {
+
+                               j--; if (jO == 0) { jN--; jO = maxOffset; } 
else { jO -= recordSize; }
+
+                               while (j > i) {
+                                       if ((cr = s.compare(pN, pO, jN, jO)) > 
0) {
                                                break;
                                        }
-                                       if (0 == cr && --rr != j) {
-                                               s.swap(rr, j);
+
+                                       if (0 == cr) {
+                                               rr--; if (rrO == 0) { rrN--; 
rrO = maxOffset; } else { rrO -= recordSize; }
+
+                                               if (rr != j) {
+                                                       s.swap(rrN, rrO, jN, 
jO);
+                                               }
                                        }
+
+                                       j--; if (jO == 0) { jN--; jO = 
maxOffset; } else { jO -= recordSize; }
                                }
                                if (i < j) {
-                                       s.swap(i, j);
+                                       s.swap(iN, iO, jN, jO);
                                } else {
                                        break;
                                }
                        }
-                       j = i;
+                       j = i; jN = iN; jO = iO;
                        // swap pivot- and all eq values- into position
                        while (ll >= p) {
-                               s.swap(ll--, --i);
+                               i--; if (iO == 0) { iN--; iO = maxOffset; } 
else { iO -= recordSize; }
+
+                               s.swap(llN, llO, iN, iO);
+
+                               ll--; if (llO == 0) { llN--; llO = maxOffset; } 
else { llO -= recordSize; }
                        }
                        while (rr < r) {
-                               s.swap(rr++, j++);
+                               s.swap(rrN, rrO, jN, jO);
+
+                               rr++; if (rrO == maxOffset) { rrN++; rrO = 0; } 
else { rrO += recordSize; }
+                               j++; if (jO == maxOffset) { jN++; jO = 0; } 
else { jO += recordSize; }
                        }
 
                        // Conquer
                        // Recurse on smaller interval first to keep stack 
shallow
                        assert i != j;
                        if (i - p < r - j) {
-                               sortInternal(s, p, i, depth);
-                               p = j;
+                               sortInternal(s, recordsPerSegment, recordSize, 
maxOffset, p, pN, pO, i, iN, iO, depth);
+                               p = j; pN = jN; pO = jO;
                        } else {
-                               sortInternal(s, j, r, depth);
-                               r = i;
+                               sortInternal(s, recordsPerSegment, recordSize, 
maxOffset, j, jN, jO, r, rN, rO, depth);
+                               r = i; rN = iN; rO = iO;
                        }
                }
        }

Reply via email to