This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new f78d1da Accord: PreLoadContext must properly and consistently support
ranges
f78d1da is described below
commit f78d1da27b09f89417dd29bde0529f12cd744e3d
Author: David Capwell <[email protected]>
AuthorDate: Fri Mar 29 10:03:44 2024 -0700
Accord: PreLoadContext must properly and consistently support ranges
patch by David Capwell; reviewed by Benedict Elliott Smith for
CASSANDRA-19355
---
.../src/main/java/accord/local/CommandsForKey.java | 1 -
.../java/accord/utils/CheckpointIntervalArray.java | 222 ++++
...er.java => CheckpointIntervalArrayBuilder.java} | 211 ++--
.../src/main/java/accord/utils/RandomSource.java | 10 +-
.../java/accord/utils/SearchableRangeList.java | 197 +---
.../accord/utils/SearchableRangeListBuilder.java | 1067 +-------------------
.../main/java/accord/utils/async/AsyncChains.java | 12 +
.../src/main/java/accord/utils/random/Picker.java | 27 +-
accord-core/src/test/java/accord/utils/Gen.java | 30 +
accord-core/src/test/java/accord/utils/Gens.java | 307 ++++++
.../src/test/java/accord/utils/Property.java | 144 ++-
.../java/accord/utils/SearchableRangeListTest.java | 116 +++
.../src/main/groovy/accord.java-conventions.gradle | 2 +-
13 files changed, 1023 insertions(+), 1323 deletions(-)
diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java
b/accord-core/src/main/java/accord/local/CommandsForKey.java
index 0e24d67..9e628b9 100644
--- a/accord-core/src/main/java/accord/local/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/CommandsForKey.java
@@ -1698,7 +1698,6 @@ public class CommandsForKey implements CommandsSummary
if (o == null || getClass() != o.getClass()) return false;
CommandsForKey that = (CommandsForKey) o;
return Objects.equals(key, that.key)
- && Objects.equals(redundantBefore, that.redundantBefore)
&& Arrays.equals(txns, that.txns);
}
diff --git
a/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java
b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java
new file mode 100644
index 0000000..84063da
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.Arrays;
+
+import accord.utils.CheckpointIntervalArrayBuilder.Accessor;
+import net.nicoulaj.compilecommand.annotations.Inline;
+
+import static accord.utils.SortedArrays.Search.CEIL;
+
+public class CheckpointIntervalArray<Ranges, Range, Key>
+{
+ // scan distance can be kept very small as we guarantee to use at most
linear extra space even with a scan distance of zero
+ static final int MAX_SCAN_DISTANCE = 255;
+ protected static final int BIT30 = 0x40000000;
+ protected static final int BIT29 = 0x20000000;
+
+ final Ranges ranges;
+
+ /**
+ * The lower bound for each checkpoint.
+ * The checkpoint {@code i} applies to all ranges (incl) starting from
{@code lowerBounds[i]},
+ * but before (excl) {@code lowerBounds[i+1]}.
+ */
+ final int[] lowerBounds;
+
+ /**
+ * Logically one entry per checkpoint, mapping {@link #lowerBounds} to
{@link #checkpointLists},
+ * however we also encode an additional byte per entry representing the
scan distance for the
+ * ranges handled by this checkpoint. These are grouped into an integer
per four mappings, i.e.
+ * we encode batches of five ints, with the first int containing the four
scan distances for the
+ * next four checkpoints, and the following four ints containing the
respective offsets into
+ * {@link #checkpointLists}.
+ * <p>
+ *
[0.........32b.........64b.........96b........128b........160b........192b]
+ * [ d1 d2 d3 d4 mapping1 mapping2 mapping3 mapping4 d5 d6 d7
d8 ]
+ */
+ final int[] headers;
+
+ /**
+ * A list of indexes in {@link #ranges} contained by each checkpoint;
checkpoints are
+ * mapped from {@link #lowerBounds} by {@link #headers}.
+ * <p>
+ * Entries are sorted in descending order by the end of the range they
cover, so that
+ * a search of this collection my terminate as soon as it encounters a
range that does
+ * not cover the item we are searching for.
+ * <p>
+ * This collection may contain negative values, in which case these point
to other
+ * checkpoints, whose <i>direct</i> contents (i.e. the positive values of)
we may
+ * search.
+ * <ul> if negative, points to an earlier checkpoint, and:
+ * <li>if the 30th bit is set, the low 20 bits point to checkpointsList,
+ * and the 9 bits in-between provide the length of the range</li>
+ * <li>otherwise, if the 29th bit is set, the lower 29 bits points to
checkpointsList,
+ * and can be iterated safely without an endIndex</li>
+ * <li>otherwise, the low 29 bits provide the length of the run, and the
low 31 bits
+ * of the following entry (which will also be negative) provide a
pointer to
+ * checkpointsList</li>
+ * </ul>
+ */
+ final int[] checkpointLists;
+
+ public final int maxScanAndCheckpointMatches;
+ private final Accessor<Ranges, Range, Key> accessor;
+
+ public CheckpointIntervalArray(Accessor<Ranges, Range, Key> accessor,
Ranges ranges,
+ int[] lowerBounds, int[] headers, int[]
checkpointLists, int maxScanAndCheckpointMatches)
+ {
+ this.accessor = accessor;
+ this.ranges = ranges;
+ this.lowerBounds = lowerBounds;
+ this.headers = headers;
+ this.checkpointLists = checkpointLists;
+ this.maxScanAndCheckpointMatches = maxScanAndCheckpointMatches;
+ }
+
+ @Inline
+ public <P1, P2, P3, P4> int forEach(Range range, IndexedQuadConsumer<P1,
P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4>
forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
+ {
+ return forEach(accessor.start(range), accessor.end(range),
forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
+ }
+
+ public <P1, P2, P3, P4> int forEach(Key startKey, Key endKey,
IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint,
IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4
p4, int minIndex)
+ {
+ if (accessor.size(ranges) == 0 || minIndex == accessor.size(ranges))
+ return minIndex;
+
+ var c = accessor.keyComparator();
+ int end = accessor.binarySearch(ranges, minIndex,
accessor.size(ranges), endKey, (a, b) -> c.compare(a, accessor.start(b)), CEIL);
+ if (end < 0) end = -1 - end;
+ if (end <= minIndex) return minIndex;
+
+ int floor = accessor.binarySearch(ranges, minIndex,
accessor.size(ranges), startKey, (a, b) -> c.compare(a, accessor.start(b)),
CEIL);
+ int start = floor;
+ if (floor < 0)
+ {
+ // if there's no precise match on start, step backwards;
+ // if this range does not overlap us, step forwards again for start
+ // but retain the floor index for performing scan and checkpoint
searches from
+ // as this contains all ranges that might overlap us (whereas
those that end
+ // after us but before the next range's start would be missed by
the next range index)
+ start = floor = -2 - floor;
+ if (start < 0)
+ start = floor = 0;
+ else if (c.compare(accessor.end(ranges, start), startKey) <= 0)
+ ++start;
+ }
+
+ // Since endInclusive() != startInclusive(), so no need to adjust
start/end comparisons
+ return forEach(start, end, floor, startKey, 0,
forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
+ }
+
+ @Inline
+ protected <P1, P2, P3, P4> int forEach(int start, int end, int floor, Key
startBound, int cmpStartBoundWithEnd,
+ IndexedQuadConsumer<P1, P2, P3, P4>
forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange,
+ P1 p1, P2 p2, P3 p3, P4 p4, int
minIndex)
+ {
+ if (start < minIndex) start = minIndex;
+
+ // find the checkpoint array, so we know how far to step back
+ int checkpoint = Arrays.binarySearch(lowerBounds, floor);
+ if (checkpoint < 0) checkpoint = -2 - checkpoint;
+ if (checkpoint < 0) return end;
+
+ int headerBaseIndex = (checkpoint / 4) * 5;
+ int headerSubIndex = checkpoint & 3;
+ int headerListIndex = headerBaseIndex + 1 + headerSubIndex;
+
+ int scanDistance = (headers[headerBaseIndex] >>> (8 * headerSubIndex))
& 0xff;
+ int checkpointStart = headers[headerListIndex];
+ int checkpointEnd = headers[headerListIndex + (headerSubIndex + 5)/4];
// skip the next header
+
+ if (scanDistance == MAX_SCAN_DISTANCE)
+ {
+ scanDistance = -checkpointLists[checkpointStart++];
+ Invariants.checkState(scanDistance >= MAX_SCAN_DISTANCE);
+ }
+
+ // NOTE: we visit in approximately ascending order, and this is a
requirement for correctness of RangeDeps builders
+ // Only the checkpoint is visited in uncertain order, but it is
visited entirely, before the scan matches
+ // and the range matches
+ int minScanIndex = Math.max(floor - scanDistance, minIndex);
+ var c = accessor.keyComparator();
+ for (int i = checkpointStart; i < checkpointEnd ; ++i)
+ {
+ int ri = checkpointLists[i];
+ if (ri < 0)
+ {
+ int subStart, subEnd;
+ if ((ri & BIT30) != 0)
+ {
+ subStart = ri & 0xfffff;
+ subEnd = subStart + ((ri >>> 20) & 0x1ff);
+ }
+ else if ((ri & BIT29) != 0)
+ {
+ subStart = ri & 0x1fffffff;
+ subEnd = Integer.MAX_VALUE;
+ }
+ else
+ {
+ int length = ri & 0x1fffffff;
+ subStart = checkpointLists[++i];
+ subEnd = subStart + length;
+ }
+
+ for (int j = subStart ; j < subEnd ; ++j)
+ {
+ ri = checkpointLists[j];
+ if (ri < 0)
+ continue;
+
+ if (c.compare(accessor.end(ranges, ri), startBound) <=
cmpStartBoundWithEnd)
+ break;
+
+ if (ri >= minIndex && ri < minScanIndex)
+ forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri);
+ }
+ }
+ else
+ {
+ // if startBound is key, we cannot be equal to it;
+ // if startBound is a Range start, we also cannot be equal to
it due to the requirement that
+ // endInclusive() != startInclusive(), so equality really
means inequality
+ if (c.compare(accessor.end(ranges, ri), startBound) <=
cmpStartBoundWithEnd)
+ break;
+
+ if (ri >= minIndex && ri < minScanIndex)
+ forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri);
+ }
+ }
+
+ for (int i = minScanIndex; i < floor ; ++i)
+ {
+ if (c.compare(accessor.end(ranges, i), startBound) >
cmpStartBoundWithEnd)
+ forEachScanOrCheckpoint.accept(p1, p2, p3, p4, i);
+ }
+
+ if (start == end)
+ return end;
+
+ forEachRange.accept(p1, p2, p3, p4, start, end);
+ return end;
+ }
+}
diff --git
a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java
similarity index 83%
copy from accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
copy to
accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java
index 77d351a..95e1d86 100644
--- a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
+++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java
@@ -18,26 +18,18 @@
package accord.utils;
-import accord.api.RoutingKey;
-import accord.primitives.Range;
-import accord.utils.ArrayBuffers.IntBuffers;
-
-import javax.annotation.Nonnull;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.Objects;
import java.util.TreeSet;
import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
import static accord.utils.ArrayBuffers.cachedInts;
-import static accord.utils.SearchableRangeList.MAX_SCAN_DISTANCE;
-import static accord.utils.SearchableRangeListBuilder.Links.LINKS;
-import static accord.utils.SearchableRangeListBuilder.Strategy.ACCURATE;
+import static accord.utils.CheckpointIntervalArray.MAX_SCAN_DISTANCE;
import static accord.utils.SortedArrays.Search.CEIL;
-/**
- * Builder for {@link SearchableRangeList}
- */
-public class SearchableRangeListBuilder
+public class CheckpointIntervalArrayBuilder<Ranges, Range, RoutingKey>
{
public enum Strategy
{
@@ -75,53 +67,80 @@ public class SearchableRangeListBuilder
NO_LINKS
}
+ public interface Accessor<Ranges, Range, RoutingKey>
+ {
+ int size(Ranges ranges);
+ Range get(Ranges ranges, int index);
+ RoutingKey start(Ranges ranges, int index);
+ RoutingKey start(Range range);
+ RoutingKey end(Ranges ranges, int index);
+ RoutingKey end(Range range);
+ Comparator<RoutingKey> keyComparator();
+ int binarySearch(Ranges ranges, int from, int to, RoutingKey find,
AsymmetricComparator<RoutingKey, Range> comparator, SortedArrays.Search op);
+ }
+
private static final int BIT31 = 0x80000000;
private static final int BIT30 = 0x40000000;
private static final int BIT29 = 0x20000000;
static final int MIN_INDIRECT_LINK_LENGTH = 2;
+ final Accessor<Ranges, Range, RoutingKey> accessor;
final boolean isAccurate;
final boolean withLinks;
- final Range[] ranges;
+ final Ranges ranges;
int[] bounds;
int[] headers;
int[] lists;
int checkpointCount, headerPointer, listCount;
- final Scan scan = new Scan();
- final TenuredSet tenured = new TenuredSet();
- final PendingCheckpoint pending = new PendingCheckpoint();
+ final Scan<Ranges, Range, RoutingKey> scan;
+ final TenuredSet<Ranges, Range, RoutingKey> tenured;
+ final PendingCheckpoint<Ranges, Range, RoutingKey> pending = new
PendingCheckpoint<>();
// track the maximum possible number of entries we can match with both a
scan + checkpoint lookup
// this is an over-estimate and may be used by consumers to allocate
out-of-order buffers for visitations
int maxScanAndCheckpointMatches;
- public SearchableRangeListBuilder(Range[] ranges, Strategy strategy, Links
links)
+ public CheckpointIntervalArrayBuilder(Accessor<Ranges, Range, RoutingKey>
accessor,
+ Ranges ranges,
+ Strategy strategy, Links links)
{
- this(ranges, Math.min(MAX_SCAN_DISTANCE, 34 -
Integer.numberOfLeadingZeros(ranges.length)), strategy, links);
+ this(accessor, ranges, Math.min(MAX_SCAN_DISTANCE, 34 -
Integer.numberOfLeadingZeros(accessor.size(ranges))), strategy, links);
}
- public SearchableRangeListBuilder(Range[] ranges, int goalScanDistance,
Strategy strategy, Links links)
+ public CheckpointIntervalArrayBuilder(Accessor<Ranges, Range, RoutingKey>
accessor,
+ Ranges ranges,
+ int goalScanDistance,
+ Strategy strategy, Links links)
{
- this.isAccurate = strategy == ACCURATE;
- this.withLinks = links == LINKS;
+ this.accessor = accessor;
+ this.isAccurate = strategy == Strategy.ACCURATE;
+ this.withLinks = links == Links.LINKS;
Invariants.checkArgument(goalScanDistance <= MAX_SCAN_DISTANCE);
Invariants.checkArgument(goalScanDistance > 0);
this.ranges = ranges;
+ this.scan = new Scan<>(accessor);
+ this.tenured = new TenuredSet<>(accessor);
init(ranges, goalScanDistance);
}
- void init(Range[] ranges, int goalScanDistance)
+ void init(Ranges ranges, int goalScanDistance)
{
// we write checkpoints at least goalScanDistance apart
scan.init(goalScanDistance);
- IntBuffers cachedInts = cachedInts();
+ ArrayBuffers.IntBuffers cachedInts = cachedInts();
// ask for int buffers in descending order of size
- this.lists = cachedInts.getInts(ranges.length); // this one might need
to grow
+ int size = accessor.size(ranges);
+ this.lists = cachedInts.getInts(size); // this one might need to grow
// +2 to round-up each division, and +2 to account for the final entry
(which might require an empty scan distance header)
- this.headers = cachedInts.getInts(((ranges.length / goalScanDistance)
* 5) / 4 + 4);
- this.bounds = cachedInts.getInts(ranges.length / goalScanDistance + 1);
+ this.headers = cachedInts.getInts(((size / goalScanDistance) * 5) / 4
+ 4);
+ this.bounds = cachedInts.getInts(size / goalScanDistance + 1);
+ }
+
+ public interface Factory<T, Ranges>
+ {
+ T build(Ranges ranges, int[] bounds, int[] headers, int[] lists, int
maxScanAndCheckpointMatches);
}
/**
@@ -141,9 +160,15 @@ public class SearchableRangeListBuilder
* discounted by the number of entries we have removed from the tenured
collection since
* the last checkpoint.
*/
- public SearchableRangeList build()
+ public CheckpointIntervalArray<Ranges, Range, RoutingKey> build()
{
- for (int ri = 0 ; ri < ranges.length ; ++ri)
+ return build((ranges, bounds, headers, lists,
maxScanAndCheckpointMatches) -> new CheckpointIntervalArray<>(accessor, ranges,
bounds, headers, lists, maxScanAndCheckpointMatches));
+ }
+
+ public <T> T build(Factory<T, Ranges> factory)
+ {
+ int size = accessor.size(ranges);
+ for (int ri = 0 ; ri < size ; ++ri)
{
// write a checkpoint if we meet our linear space complexity
requirements
// and we either have a tenured range that we must scan,
@@ -157,14 +182,14 @@ public class SearchableRangeListBuilder
}
// write our final pending checkpoint
- writeCheckpoint(ranges.length);
+ writeCheckpoint(size);
closeHeaders();
- IntBuffers cachedInts = cachedInts();
+ ArrayBuffers.IntBuffers cachedInts = cachedInts();
int[] lists = cachedInts.completeAndDiscard(this.lists, listCount);
int[] headers = cachedInts.completeAndDiscard(this.headers,
headerPointer);
int[] bounds = cachedInts.completeAndDiscard(this.bounds,
checkpointCount);
- return new SearchableRangeList(ranges, bounds, headers, lists,
maxScanAndCheckpointMatches);
+ return factory.build(ranges, bounds, headers, lists,
maxScanAndCheckpointMatches);
}
/**
@@ -179,7 +204,7 @@ public class SearchableRangeListBuilder
Invariants.checkArgument(index >= 0);
// then either migrate the index to pendingTenured, or ensure it will
be scanned
- RoutingKey end = ranges[index].end();
+ RoutingKey end = accessor.end(ranges, index);
int scanLimit = scanLimit(index, isAccurate ? scan.goal :
maxScanDistance());
if (shouldTenure(end, scanLimit))
{
@@ -220,12 +245,12 @@ public class SearchableRangeListBuilder
*/
private int scanLimit(int atIndex, int maxScanDistance)
{
- return Math.min(1 + atIndex + maxScanDistance, ranges.length);
+ return Math.min(1 + atIndex + maxScanDistance, accessor.size(ranges));
}
private boolean shouldTenure(RoutingKey end, int scanLimit)
{
- return scanLimit < ranges.length &&
end.compareTo(ranges[scanLimit].start()) > 0;
+ return scanLimit < accessor.size(ranges) &&
accessor.keyComparator().compare(end, accessor.start(ranges, scanLimit)) > 0;
}
private boolean canWriteCheckpoint(int atIndex)
@@ -298,7 +323,7 @@ public class SearchableRangeListBuilder
// TODO (low priority, efficiency): we can shift back the existing
scanDistance if it's far enough from
// the next checkpoint. this might permit us to skip some
comparisons
scan.resetPeakMax(tenured);
- for (Tenured tenured : this.tenured)
+ for (Tenured<Ranges, Range, RoutingKey> tenured : this.tenured)
{
int distanceToEnd = (tenured.lastIndex - checkpointIndex);
if (distanceToEnd >= scan.peakMax)
@@ -314,7 +339,7 @@ public class SearchableRangeListBuilder
int ri = Scan.minScanIndex(checkpointIndex, scan.goal);
while (ri < checkpointIndex)
{
- RoutingKey end = ranges[ri].end();
+ RoutingKey end = accessor.end(ranges, ri);
int scanLimit = scanLimit(ri, scan.peakMax);
if (!shouldTenure(end, scanLimit))
scan.update(end, ri, ranges, scanLimit, null);
@@ -342,7 +367,7 @@ public class SearchableRangeListBuilder
int ri = Math.min(minUntenuredIndex, minScanIndex);
while (ri < checkpointIndex)
{
- RoutingKey end = ranges[ri].end();
+ RoutingKey end = accessor.end(ranges, ri);
int newPeakMax = scan.newPeakMax(tenured);
int scanLimit = scanLimit(ri, newPeakMax);
if (shouldTenure(end, scanLimit))
@@ -381,7 +406,7 @@ public class SearchableRangeListBuilder
int target = (maxScanDistance - scanDistance)/2;
for (int i = 0 ; i < pending.count() ; ++i)
{
- Tenured t = pending.get(i);
+ Tenured<Ranges, Range, RoutingKey> t = pending.get(i);
if (t.index < 0)
continue;
@@ -406,12 +431,12 @@ public class SearchableRangeListBuilder
return scanDistance;
}
- int writeList(PendingCheckpoint pending)
+ int writeList(PendingCheckpoint<Ranges, Range, RoutingKey> pending)
{
int startIndex = listCount;
for (int i = pending.count() - 1 ; i >= 0 ; --i)
{
- Tenured t = pending.get(i);
+ Tenured<Ranges, Range, RoutingKey> t = pending.get(i);
if (t.index >= 0)
{
lists[listCount++] = t.index;
@@ -467,8 +492,9 @@ public class SearchableRangeListBuilder
lists = cachedInts().resize(lists, listCount, lists.length +
lists.length/2 + maxPendingSize);
}
- static class Scan
+ static class Scan<Ranges, Range, RoutingKey>
{
+ final Accessor<Ranges, Range, RoutingKey> accessor;
/** the scan distance we are aiming for; should be proportional to
log2(N) */
int goal;
@@ -487,18 +513,23 @@ public class SearchableRangeListBuilder
/** The maximum (i.e. initial) scan distance limit we have used since
the last attempted checkpoint write */
int peakMax;
+ Scan(Accessor<Ranges, Range, RoutingKey> accessor)
+ {
+ this.accessor = accessor;
+ }
+
void init(int goalScanDistance)
{
goal = peakMax = goalScanDistance;
}
- private void update(RoutingKey end, int atIndex, Range[] ranges, int
scanLimit, SearchableRangeListBuilder checkpoint)
+ private void update(RoutingKey end, int atIndex, Ranges ranges, int
scanLimit, CheckpointIntervalArrayBuilder<Ranges, Range, RoutingKey> checkpoint)
{
int newScanDistance = find(end, atIndex, ranges, scanLimit,
watermark);
updateScanDistance(atIndex, newScanDistance, checkpoint);
}
- private void updateScanDistance(int atIndex, int newScanDistance,
SearchableRangeListBuilder checkpoint)
+ private void updateScanDistance(int atIndex, int newScanDistance,
CheckpointIntervalArrayBuilder<Ranges, Range, RoutingKey> checkpoint)
{
if (newScanDistance > watermark)
{
@@ -518,9 +549,10 @@ public class SearchableRangeListBuilder
}
}
- private int find(RoutingKey end, int atIndex, Range[] ranges, int
scanLimit, int currentScanDistance)
+ private int find(RoutingKey end, int atIndex, Ranges ranges, int
scanLimit, int currentScanDistance)
{
- int lowerIndex = SortedArrays.exponentialSearch(ranges, atIndex +
currentScanDistance, scanLimit, end, (e, s) -> e.compareTo(s.start()), CEIL);
+ var c = accessor.keyComparator();
+ int lowerIndex = accessor.binarySearch(ranges, atIndex +
currentScanDistance, scanLimit, end, (e, s) -> c.compare(e, accessor.start(s)),
CEIL);
if (lowerIndex < 0) lowerIndex = -2 - lowerIndex;
else lowerIndex -= 1;
return lowerIndex - atIndex;
@@ -557,7 +589,7 @@ public class SearchableRangeListBuilder
* will bounce around triggering checkpoints due to the larger scan
distance, resetting the scan distance
* and starting again
*/
- boolean hasMaybeDivergedFromMatchSize(TenuredSet tenured)
+ boolean hasMaybeDivergedFromMatchSize(TenuredSet<Ranges, Range,
RoutingKey> tenured)
{
return isAboveGoal() && tenured.count() < watermark()/2;
}
@@ -622,7 +654,7 @@ public class SearchableRangeListBuilder
watermark = 0;
}
- void resetPeakMax(TenuredSet tenured)
+ void resetPeakMax(TenuredSet<Ranges, Range, RoutingKey> tenured)
{
peakMax = newPeakMax(tenured);
}
@@ -632,7 +664,7 @@ public class SearchableRangeListBuilder
return peakMax;
}
- int newPeakMax(TenuredSet tenured)
+ int newPeakMax(TenuredSet<Ranges, Range, RoutingKey> tenured)
{
return Math.max(goal, tenured.count());
}
@@ -640,7 +672,7 @@ public class SearchableRangeListBuilder
/**
* The minimum index containing a range that might need to be tenured,
if we have a smaller max scan distance than before
*/
- int minUntenuredIndex(int checkpointIndex, TenuredSet tenured)
+ int minUntenuredIndex(int checkpointIndex, TenuredSet<Ranges, Range,
RoutingKey> tenured)
{
int minUntenuredIndex = Math.max(0, (checkpointIndex - 1) -
watermark());
// the maximum scan distance that cxould ever have been adopted
for the ranges processed since last checkpoint
@@ -692,8 +724,9 @@ public class SearchableRangeListBuilder
/**
* Record-keeping for a range we have decided is not scannable
*/
- static class Tenured implements Comparable<Tenured>
+ static class Tenured<Ranges, Range, RoutingKey> implements
Comparable<Tenured<Ranges, Range, RoutingKey>>
{
+ final Accessor<Ranges, Range, RoutingKey> accessor;
/**
* The end of the tenured range covered by the contents referred to be
{@link #index}
*/
@@ -723,7 +756,7 @@ public class SearchableRangeListBuilder
* list so that its entries are re-inserted in the next
checkpoint</li>
* </ul>
*/
- Tenured next;
+ Tenured<Ranges, Range, RoutingKey> next;
/**
* Only set for link entries, i.e. where {@code index < 0}.
@@ -734,16 +767,17 @@ public class SearchableRangeListBuilder
*/
int linkLength;
- Tenured(RoutingKey end, int index)
+ Tenured(Accessor<Ranges, Range, RoutingKey> accessor, RoutingKey end,
int index)
{
+ this.accessor = accessor;
this.end = end;
this.index = index;
}
@Override
- public int compareTo(@Nonnull Tenured that)
+ public int compareTo(@Nonnull Tenured<Ranges, Range, RoutingKey> that)
{
- int c = this.end.compareTo(that.end);
+ int c = accessor.keyComparator().compare(this.end, that.end);
// we sort indexes in reverse order so later tenured items find
the earlier ones with same end when searching
// for higher entries for the range of indexes to search, and
if (c == 0) c = -Integer.compare(this.index, that.index);
@@ -766,8 +800,9 @@ public class SearchableRangeListBuilder
* to use a collection that permits us to insert and return a finger into
the tree so we can find the
* successor as part of insertion, and that permits constant-time first()
calls
*/
- static class TenuredSet extends TreeSet<Tenured>
+ static class TenuredSet<Ranges, Range, RoutingKey> extends
TreeSet<Tenured<Ranges, Range, RoutingKey>>
{
+ final Accessor<Ranges, Range, RoutingKey> accessor;
/**
* the number of direct tenured entries (i.e. ignoring link entries)
* this is used to provide a minimum bound on the number of results a
range query can return
@@ -778,7 +813,12 @@ public class SearchableRangeListBuilder
int minSpan;
// a stack of recently used EndAndIndex objects - used only for the
duration of a single build
- Tenured reuse, pendingReuse, pendingReuseTail;
+ Tenured<Ranges, Range, RoutingKey> reuse, pendingReuse,
pendingReuseTail;
+
+ TenuredSet(Accessor<Ranges, Range, RoutingKey> accessor)
+ {
+ this.accessor = accessor;
+ }
int count()
{
@@ -805,38 +845,39 @@ public class SearchableRangeListBuilder
return minSpan;
}
- private int tenure(RoutingKey end, int index, Range[] ranges, int
minUntenureIndex)
+ private int tenure(RoutingKey end, int index, Ranges ranges, int
minUntenureIndex)
{
- return tenure(newTenured(end, index), ranges, minUntenureIndex,
ranges.length);
+ return tenure(newTenured(end, index), ranges, minUntenureIndex,
accessor.size(ranges));
}
- private void tenure(RoutingKey end, int index, Range[] ranges, int
minUntenureIndex, int untenureLimit)
+ private void tenure(RoutingKey end, int index, Ranges ranges, int
minUntenureIndex, int untenureLimit)
{
tenure(newTenured(end, index), ranges, minUntenureIndex,
untenureLimit);
}
- private int tenure(Tenured tenure, Range[] ranges, int
untenureMinIndex, int untenureLimit)
+ private int tenure(Tenured<Ranges, Range, RoutingKey> tenure, Ranges
ranges, int untenureMinIndex, int untenureLimit)
{
if (!add(tenure))
return tenure.lastIndex;
- Tenured next = higher(tenure);
+ Tenured<Ranges, Range, RoutingKey> next = higher(tenure);
if (next != null)
untenureLimit = Math.min(untenureLimit, next.lastIndex + 1);
- int untenureIndex = SortedArrays.binarySearch(ranges,
untenureMinIndex, untenureLimit, tenure.end, (e, s) -> e.compareTo(s.start()),
CEIL);
+ var c = accessor.keyComparator();
+ int untenureIndex = accessor.binarySearch(ranges,
untenureMinIndex, untenureLimit, tenure.end, (e, s) -> c.compare(e,
accessor.start(s)), CEIL);
if (untenureIndex < 0) untenureIndex = -1 - untenureIndex;
tenure.lastIndex = untenureIndex - 1;
-
Invariants.checkState(tenure.end.compareTo(ranges[tenure.lastIndex].start()) >
0);
- Invariants.checkState(tenure.lastIndex + 1 == ranges.length ||
tenure.end.compareTo(ranges[tenure.lastIndex + 1].start()) <= 0);
+ Invariants.checkState(c.compare(tenure.end, accessor.start(ranges,
tenure.lastIndex)) > 0);
+ Invariants.checkState(tenure.lastIndex + 1 ==
accessor.size(ranges) || c.compare(tenure.end, accessor.start(ranges,
tenure.lastIndex + 1)) <= 0);
++directCount;
return untenureIndex - 1;
}
- private Tenured newTenured(RoutingKey end, int index)
+ private Tenured<Ranges, Range, RoutingKey> newTenured(RoutingKey end,
int index)
{
- Tenured result = reuse;
+ Tenured<Ranges, Range, RoutingKey> result = reuse;
if (result == null)
- return new Tenured(end, index);
+ return new Tenured<>(accessor, end, index);
reuse = result.next;
result.end = end;
@@ -846,10 +887,10 @@ public class SearchableRangeListBuilder
return result;
}
- private Tenured addLinkEntry(RoutingKey end, int index, int lastIndex,
int length)
+ private Tenured<Ranges, Range, RoutingKey> addLinkEntry(RoutingKey
end, int index, int lastIndex, int length)
{
Invariants.checkArgument(index < 0);
- Tenured result = newTenured(end, index);
+ Tenured<Ranges, Range, RoutingKey> result = newTenured(end, index);
result.linkLength = length;
result.lastIndex = lastIndex;
add(result);
@@ -864,7 +905,7 @@ public class SearchableRangeListBuilder
{
while (!isEmpty() && first().lastIndex < index)
{
- Tenured removed = pollFirst();
+ Tenured<Ranges, Range, RoutingKey> removed = pollFirst();
// if removed.next == null, this is not referenced by a link
// if removed.next == removed, it is referenced by a link but
does not modify the link on removal
@@ -883,7 +924,7 @@ public class SearchableRangeListBuilder
// this case
// first clear the chain starting at the removed entry
- Tenured prev = removed, next = removed.next;
+ Tenured<Ranges, Range, RoutingKey> prev = removed, next =
removed.next;
while (next.next != null)
{
prev = next;
@@ -922,9 +963,9 @@ public class SearchableRangeListBuilder
* Write out any direct entries that are not pointed to by a chain
entry, and any chain entries;
* rollover any per-checkpoint data and free up for reuse discarded
Tenured objects
*/
- void rollover(PendingCheckpoint pending)
+ void rollover(PendingCheckpoint<Ranges, Range, RoutingKey> pending)
{
- for (Tenured tenured : this)
+ for (Tenured<Ranges, Range, RoutingKey> tenured : this)
{
if (tenured.next == null)
pending.add(tenured);
@@ -946,12 +987,12 @@ public class SearchableRangeListBuilder
* known (i.e. when the next checkpoint is written) we re-process the list
to remove items we can now scan before
* serializing to checkpointListsBuf.
*/
- static class PendingCheckpoint
+ static class PendingCheckpoint<Ranges, Range, RoutingKey>
{
int atIndex = -1;
int count;
- Tenured[] contents = new Tenured[10];
+ Tenured<Ranges, Range, RoutingKey>[] contents = new Tenured[10];
int openDirectCount, firstOpenDirect, openIndirectCount;
boolean hasClosedDirect;
@@ -961,12 +1002,12 @@ public class SearchableRangeListBuilder
return count;
}
- Tenured get(int i)
+ Tenured<Ranges, Range, RoutingKey> get(int i)
{
return contents[i];
}
- void add(Tenured tenured)
+ void add(Tenured<Ranges, Range, RoutingKey> tenured)
{
if (contents.length == count)
contents = Arrays.copyOf(contents, 2 * contents.length);
@@ -994,7 +1035,7 @@ public class SearchableRangeListBuilder
for (int i = 0; i < maxi ; ++i)
{
- Tenured t = get(i);
+ Tenured<Ranges, Range, RoutingKey> t = get(i);
if (t.index >= 0)
{
if (t.index + scanDistance >= lastIndex)
@@ -1032,17 +1073,17 @@ public class SearchableRangeListBuilder
* Setup a link for referencing this chain later, if permitted.
* Must have at least two items, and at least as many direct records
as indirect
*/
- void setupLinkChain(TenuredSet tenured, int startIndex, int endIndex)
+ void setupLinkChain(TenuredSet<Ranges, Range, RoutingKey> tenured, int
startIndex, int endIndex)
{
int minSizeToReference = openIndirectCount +
MIN_INDIRECT_LINK_LENGTH;
if (openDirectCount >= minSizeToReference)
{
int i = firstOpenDirect;
- Tenured prev = get(i++);
+ Tenured<Ranges, Range, RoutingKey> prev = get(i++);
while (openDirectCount > minSizeToReference)
{
- Tenured e = get(i++);
+ Tenured<Ranges, Range, RoutingKey> e = get(i++);
if (e.index < 0)
{
--minSizeToReference;
@@ -1057,7 +1098,7 @@ public class SearchableRangeListBuilder
while (i < count)
{
- Tenured next = get(i++);
+ Tenured<Ranges, Range, RoutingKey> next = get(i++);
if (next.index < 0)
continue;
@@ -1068,7 +1109,7 @@ public class SearchableRangeListBuilder
// may be more than one entry per item (though usually not)
int length = endIndex - startIndex;
- Tenured chainEntry = tenured.addLinkEntry(prev.end, BIT31 |
startIndex, prev.lastIndex, length);
+ Tenured<Ranges, Range, RoutingKey> chainEntry =
tenured.addLinkEntry(prev.end, BIT31 | startIndex, prev.lastIndex, length);
prev.next = chainEntry;
if (hasClosedDirect && (startIndex > 0xfffff || (length >
0xff)))
{
@@ -1085,8 +1126,8 @@ public class SearchableRangeListBuilder
public String toString()
{
return Arrays.stream(contents, 0, count)
- .map(Objects::toString)
- .collect(Collectors.joining(",", "[", "]"));
+ .map(Objects::toString)
+ .collect(Collectors.joining(",", "[", "]"));
}
}
}
diff --git a/accord-core/src/main/java/accord/utils/RandomSource.java
b/accord-core/src/main/java/accord/utils/RandomSource.java
index 067b872..ed971aa 100644
--- a/accord-core/src/main/java/accord/utils/RandomSource.java
+++ b/accord-core/src/main/java/accord/utils/RandomSource.java
@@ -29,6 +29,8 @@ import java.util.function.IntSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import com.google.common.collect.Iterables;
+
import accord.utils.random.Picker;
// TODO (expected): merge with C* RandomSource
@@ -286,7 +288,13 @@ public interface RandomSource
return array[nextInt(offset, offset + length)];
}
- default <T extends Comparable<T>> T pick(Set<T> set)
+ default <T> T pick(NavigableSet<T> set)
+ {
+ int offset = nextInt(0, set.size());
+ return Iterables.get(set, offset);
+ }
+
+ default <T extends Comparable<? super T>> T pick(Set<T> set)
{
List<T> values = new ArrayList<>(set);
// Non-ordered sets may have different iteration order on different
environments, which would make a seed produce different histories!
diff --git a/accord-core/src/main/java/accord/utils/SearchableRangeList.java
b/accord-core/src/main/java/accord/utils/SearchableRangeList.java
index 22aeb3a..65e4465 100644
--- a/accord-core/src/main/java/accord/utils/SearchableRangeList.java
+++ b/accord-core/src/main/java/accord/utils/SearchableRangeList.java
@@ -18,17 +18,14 @@
package accord.utils;
-import accord.api.RoutingKey;
import accord.primitives.Range;
import accord.primitives.RoutableKey;
-import accord.utils.SearchableRangeListBuilder.Links;
-import accord.utils.SearchableRangeListBuilder.Strategy;
+import accord.utils.CheckpointIntervalArrayBuilder.Links;
+import accord.utils.CheckpointIntervalArrayBuilder.Strategy;
import net.nicoulaj.compilecommand.annotations.Inline;
-import java.util.*;
-
-import static accord.utils.SearchableRangeListBuilder.Links.LINKS;
-import static accord.utils.SearchableRangeListBuilder.Strategy.ACCURATE;
+import static accord.utils.CheckpointIntervalArrayBuilder.Links.LINKS;
+import static accord.utils.CheckpointIntervalArrayBuilder.Strategy.ACCURATE;
import static accord.utils.SortedArrays.Search.*;
/**
@@ -79,104 +76,13 @@ import static accord.utils.SortedArrays.Search.*;
* earlier checkpoints.
* </ul>
*/
-public class SearchableRangeList
+public class SearchableRangeList extends CheckpointIntervalArray<Range[],
Range, RoutableKey>
{
- // scan distance can be kept very small as we guarantee to use at most
linear extra space even with a scan distance of zero
- static final int MAX_SCAN_DISTANCE = 255;
- private static final int BIT30 = 0x40000000;
- private static final int BIT29 = 0x20000000;
-
private static final SearchableRangeList EMPTY_CHECKPOINTS = new
SearchableRangeList(new Range[0], new int[0], new int[] { 0, 0 }, new int[0],
0);
- final Range[] ranges;
-
- /**
- * The lower bound for each checkpoint.
- * The checkpoint {@code i} applies to all ranges (incl) starting from
{@code lowerBounds[i]},
- * but before (excl) {@code lowerBounds[i+1]}.
- */
- final int[] lowerBounds;
-
- /**
- * Logically one entry per checkpoint, mapping {@link #lowerBounds} to
{@link #checkpointLists},
- * however we also encode an additional byte per entry representing the
scan distance for the
- * ranges handled by this checkpoint. These are grouped into an integer
per four mappings, i.e.
- * we encode batches of five ints, with the first int containing the four
scan distances for the
- * next four checkpoints, and the following four ints containing the
respective offsets into
- * {@link #checkpointLists}.
- * <p>
- *
[0.........32b.........64b.........96b........128b........160b........192b]
- * [ d1 d2 d3 d4 mapping1 mapping2 mapping3 mapping4 d5 d6 d7
d8 ]
- */
- final int[] headers;
-
- /**
- * A list of indexes in {@link #ranges} contained by each checkpoint;
checkpoints are
- * mapped from {@link #lowerBounds} by {@link #headers}.
- * <p>
- * Entries are sorted in descending order by the end of the range they
cover, so that
- * a search of this collection my terminate as soon as it encounters a
range that does
- * not cover the item we are searching for.
- * <p>
- * This collection may contain negative values, in which case these point
to other
- * checkpoints, whose <i>direct</i> contents (i.e. the positive values of)
we may
- * search.
- * <ul> if negative, points to an earlier checkpoint, and:
- * <li>if the 30th bit is set, the low 20 bits point to checkpointsList,
- * and the 9 bits in-between provide the length of the range</li>
- * <li>otherwise, if the 29th bit is set, the lower 29 bits points to
checkpointsList,
- * and can be iterated safely without an endIndex</li>
- * <li>otherwise, the low 29 bits provide the length of the run, and the
low 31 bits
- * of the following entry (which will also be negative) provide a
pointer to
- * checkpointsList</li>
- * </ul>
- */
- final int[] checkpointLists;
-
- public final int maxScanAndCheckpointMatches;
-
- SearchableRangeList(Range[] ranges, int[] lowerBounds, int[] headers,
int[] checkpointLists, int maxScanAndCheckpointMatches)
+ public SearchableRangeList(Range[] ranges, int[] lowerBounds, int[]
headers, int[] checkpointLists, int maxScanAndCheckpointMatches)
{
- this.ranges = ranges;
- this.lowerBounds = lowerBounds;
- this.headers = headers;
- this.checkpointLists = checkpointLists;
- this.maxScanAndCheckpointMatches = maxScanAndCheckpointMatches;
- }
-
- @Inline
- public <P1, P2, P3, P4> int forEach(Range range, IndexedQuadConsumer<P1,
P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4>
forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
- {
- return forEach(range.start(), range.end(), forEachScanOrCheckpoint,
forEachRange, p1, p2, p3, p4, minIndex);
- }
-
- public <P1, P2, P3, P4> int forEach(RoutingKey startKey, RoutingKey
endKey, IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint,
IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4
p4, int minIndex)
- {
- if (ranges.length == 0 || minIndex == ranges.length)
- return minIndex;
-
- int end = SortedArrays.binarySearch(ranges, minIndex, ranges.length,
endKey, (a, b) -> a.compareTo(b.start()), CEIL);
- if (end < 0) end = -1 - end;
- if (end <= minIndex) return minIndex;
-
- int floor = SortedArrays.binarySearch(ranges, minIndex, ranges.length,
startKey, (a, b) -> a.compareTo(b.start()), CEIL);
- int start = floor;
- if (floor < 0)
- {
- // if there's no precise match on start, step backwards;
- // if this range does not overlap us, step forwards again for start
- // but retain the floor index for performing scan and checkpoint
searches from
- // as this contains all ranges that might overlap us (whereas
those that end
- // after us but before the next range's start would be missed by
the next range index)
- start = floor = -2 - floor;
- if (start < 0)
- start = floor = 0;
- else if (ranges[start].end().compareTo(startKey) <= 0)
- ++start;
- }
-
- // Since endInclusive() != startInclusive(), so no need to adjust
start/end comparisons
- return forEach(start, end, floor, startKey, 0,
forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
+ super(SearchableRangeListBuilder.RANGE_ACCESSOR, ranges, lowerBounds,
headers, checkpointLists, maxScanAndCheckpointMatches);
}
@Inline
@@ -209,95 +115,6 @@ public class SearchableRangeList
return forEach(start, end, floor, key, bound, forEachScanOrCheckpoint,
forEachRange, p1, p2, p3, p4, minIndex);
}
- @Inline
- private <P1, P2, P3, P4> int forEach(int start, int end, int floor,
RoutableKey startBound, int cmpStartBoundWithEnd,
- IndexedQuadConsumer<P1, P2, P3, P4>
forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange,
- P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
- {
- if (start < minIndex) start = minIndex;
-
- // find the checkpoint array, so we know how far to step back
- int checkpoint = Arrays.binarySearch(lowerBounds, floor);
- if (checkpoint < 0) checkpoint = -2 - checkpoint;
- if (checkpoint < 0) return end;
-
- int headerBaseIndex = (checkpoint / 4) * 5;
- int headerSubIndex = checkpoint & 3;
- int headerListIndex = headerBaseIndex + 1 + headerSubIndex;
-
- int scanDistance = (headers[headerBaseIndex] >>> (8 * headerSubIndex))
& 0xff;
- int checkpointStart = headers[headerListIndex];
- int checkpointEnd = headers[headerListIndex + (headerSubIndex + 5)/4];
// skip the next header
-
- if (scanDistance == MAX_SCAN_DISTANCE)
- {
- scanDistance = -checkpointLists[checkpointStart++];
- Invariants.checkState(scanDistance >= MAX_SCAN_DISTANCE);
- }
-
- // NOTE: we visit in approximately ascending order, and this is a
requirement for correctness of RangeDeps builders
- // Only the checkpoint is visited in uncertain order, but it is
visited entirely, before the scan matches
- // and the range matches
- int minScanIndex = Math.max(floor - scanDistance, minIndex);
- for (int i = checkpointStart; i < checkpointEnd ; ++i)
- {
- int ri = checkpointLists[i];
- if (ri < 0)
- {
- int subStart, subEnd;
- if ((ri & BIT30) != 0)
- {
- subStart = ri & 0xfffff;
- subEnd = subStart + ((ri >>> 20) & 0x1ff);
- }
- else if ((ri & BIT29) != 0)
- {
- subStart = ri & 0x1fffffff;
- subEnd = Integer.MAX_VALUE;
- }
- else
- {
- int length = ri & 0x1fffffff;
- subStart = checkpointLists[++i];
- subEnd = subStart + length;
- }
-
- for (int j = subStart ; j < subEnd ; ++j)
- {
- ri = checkpointLists[j];
- if (ri < 0)
- continue;
-
- if (ranges[ri].end().compareTo(startBound) <=
cmpStartBoundWithEnd)
- break;
-
- if (ri >= minIndex && ri < minScanIndex)
- forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri);
- }
- }
- else
- {
- // if startBound is key, we cannot be equal to it;
- // if startBound is a Range start, we also cannot be equal to
it due to the requirement that
- // endInclusive() != startInclusive(), so equality really
means inequality
- if (ranges[ri].end().compareTo(startBound) <=
cmpStartBoundWithEnd)
- break;
-
- if (ri >= minIndex && ri < minScanIndex)
- forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri);
- }
- }
-
- for (int i = minScanIndex; i < floor ; ++i)
- {
- if (ranges[i].end().compareTo(startBound) > cmpStartBoundWithEnd)
- forEachScanOrCheckpoint.accept(p1, p2, p3, p4, i);
- }
-
- forEachRange.accept(p1, p2, p3, p4, start, end);
- return end;
- }
-
public static SearchableRangeList build(Range[] ranges)
{
if (ranges.length == 0)
diff --git
a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
index 77d351a..546ee16 100644
--- a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
+++ b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
@@ -18,1075 +18,84 @@
package accord.utils;
-import accord.api.RoutingKey;
-import accord.primitives.Range;
-import accord.utils.ArrayBuffers.IntBuffers;
+import java.util.Comparator;
-import javax.annotation.Nonnull;
-import java.util.Arrays;
-import java.util.Objects;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
+import accord.primitives.Range;
+import accord.primitives.RoutableKey;
-import static accord.utils.ArrayBuffers.cachedInts;
import static accord.utils.SearchableRangeList.MAX_SCAN_DISTANCE;
-import static accord.utils.SearchableRangeListBuilder.Links.LINKS;
-import static accord.utils.SearchableRangeListBuilder.Strategy.ACCURATE;
-import static accord.utils.SortedArrays.Search.CEIL;
/**
* Builder for {@link SearchableRangeList}
*/
-public class SearchableRangeListBuilder
+public class SearchableRangeListBuilder extends
CheckpointIntervalArrayBuilder<Range[], Range, RoutableKey>
{
- public enum Strategy
- {
- /**
- * Do not tenure any ranges that are scannable from the currently
in-effect max scan distance.
- * This means we probably do less work on construction, but that our
measure of the match count
- * at any point is inaccurate, and so our heuristics for when to write
checkpoints may be wrong,
- * leading to more checkpoints than necessary.
- */
- FAST,
-
- /**
- * Tenure every range covering more than goalScanDistance. Any within
max scan distance will also
- * update the scan distance, so that they will be filtered when a
checkpoint is written. But
- * in the meantime they permit accurate tracking of the number of
matches a query can return,
- * permitting our complexity calculations (that determine when
checkpoints should be written, and
- * what our maximum scan distance should be), to be accurate. This can
avoid bouncing between
- * two extremes, where a low max scan distance tenures and correctly
detects a desirable larger scan
- * distance, which we rollover and this prevents us tenuring and
tracking the number of matches, so
- * that we then pick a low max scan distance (and thereby also write a
new checkpoint)
- */
- ACCURATE
- }
-
- /**
- * Should we maintain pointers to prior checkpoints that we may reference
instead of reserializing
- * the remaining contents. This is cheap to visit as we stop enumerating
as soon as we encounter
- * an entry that no longer covers us. We use some simple heuristics when
deciding whether to do
- * this, namely that there are at least two entries (so we save one
checkpoint entry) and that
- * there is at least one direct entry for each indirect/link entry in the
range we will link.
- */
- public enum Links
- {
- LINKS,
- NO_LINKS
- }
-
- private static final int BIT31 = 0x80000000;
- private static final int BIT30 = 0x40000000;
- private static final int BIT29 = 0x20000000;
- static final int MIN_INDIRECT_LINK_LENGTH = 2;
-
- final boolean isAccurate;
- final boolean withLinks;
- final Range[] ranges;
-
- int[] bounds;
- int[] headers;
- int[] lists;
- int checkpointCount, headerPointer, listCount;
-
- final Scan scan = new Scan();
- final TenuredSet tenured = new TenuredSet();
- final PendingCheckpoint pending = new PendingCheckpoint();
-
- // track the maximum possible number of entries we can match with both a
scan + checkpoint lookup
- // this is an over-estimate and may be used by consumers to allocate
out-of-order buffers for visitations
- int maxScanAndCheckpointMatches;
-
- public SearchableRangeListBuilder(Range[] ranges, Strategy strategy, Links
links)
- {
- this(ranges, Math.min(MAX_SCAN_DISTANCE, 34 -
Integer.numberOfLeadingZeros(ranges.length)), strategy, links);
- }
-
- public SearchableRangeListBuilder(Range[] ranges, int goalScanDistance,
Strategy strategy, Links links)
- {
- this.isAccurate = strategy == ACCURATE;
- this.withLinks = links == LINKS;
- Invariants.checkArgument(goalScanDistance <= MAX_SCAN_DISTANCE);
- Invariants.checkArgument(goalScanDistance > 0);
- this.ranges = ranges;
- init(ranges, goalScanDistance);
- }
-
- void init(Range[] ranges, int goalScanDistance)
- {
- // we write checkpoints at least goalScanDistance apart
- scan.init(goalScanDistance);
- IntBuffers cachedInts = cachedInts();
- // ask for int buffers in descending order of size
- this.lists = cachedInts.getInts(ranges.length); // this one might need
to grow
- // +2 to round-up each division, and +2 to account for the final entry
(which might require an empty scan distance header)
- this.headers = cachedInts.getInts(((ranges.length / goalScanDistance)
* 5) / 4 + 4);
- this.bounds = cachedInts.getInts(ranges.length / goalScanDistance + 1);
- }
-
- /**
- * Walk over each range, looking ahead by {@link #maxScanDistance} to
decide if a range should
- * be tenured (written to a checkpoint) or scanned; the maximum scan
distance is determined by the
- * number of open tenured entries, i.e. the minimum number of results we
can expect to be returned
- * (or, if greater, the logarithm of the number of ranges in the
collection).
- * <p>
- * Once we encounter a range that should be tenured, either write a
checkpoint immediately
- * or make a note of the position we must scan to from the last entry in
this checkpoint
- * and wait until it is permitted to write a checkpoint. This range will
be tenured either
- * way for the following checkpoint.
- * <p>
- * The only reason not to write a checkpoint immediately is in the case we
would breach
- * our linear space complexity limit, which is imposed by ensuring we have
a space between
- * checkpoints at least as large as the number of entries written to the
last checkpoint,
- * discounted by the number of entries we have removed from the tenured
collection since
- * the last checkpoint.
- */
- public SearchableRangeList build()
- {
- for (int ri = 0 ; ri < ranges.length ; ++ri)
- {
- // write a checkpoint if we meet our linear space complexity
requirements
- // and we either have a tenured range that we must scan,
- // or the scan distance is now much larger than the minimum number
of search results
- if (shouldWriteCheckpoint(ri))
- writeCheckpoint(ri);
-
- // either tenure or update scan distance, potentially writing a
checkpoint
- tenureOrScan(ri);
- tenured.untenure(ri);
- }
-
- // write our final pending checkpoint
- writeCheckpoint(ranges.length);
- closeHeaders();
-
- IntBuffers cachedInts = cachedInts();
- int[] lists = cachedInts.completeAndDiscard(this.lists, listCount);
- int[] headers = cachedInts.completeAndDiscard(this.headers,
headerPointer);
- int[] bounds = cachedInts.completeAndDiscard(this.bounds,
checkpointCount);
- return new SearchableRangeList(ranges, bounds, headers, lists,
maxScanAndCheckpointMatches);
- }
-
- /**
- * Categorise the candidateIdx as either scannable, and if so update the
scan distance;
- * or unscannable, in which case add it to the {@link #tenured} collection.
- * Note, that in ACCURATE mode we tenure the item if it is outside of the
goalScanDistance
- * so we may track O(k) accurately above the O(lg2(N)) search and default
scan distance,
- * but we still update the scan distance so that the checkpoint will
exclude this entry.
- */
- private void tenureOrScan(int index)
- {
- Invariants.checkArgument(index >= 0);
-
- // then either migrate the index to pendingTenured, or ensure it will
be scanned
- RoutingKey end = ranges[index].end();
- int scanLimit = scanLimit(index, isAccurate ? scan.goal :
maxScanDistance());
- if (shouldTenure(end, scanLimit))
- {
- int lastIndex = tenured.tenure(end, index, ranges, scanLimit + 1);
- if (lastIndex - index > maxScanDistance()) scan.tenured(index);
- else if (!isAccurate) throw new IllegalStateException();
- else scan.updateScanDistance(index, lastIndex - index, this);
- }
- else
- {
- // TODO (low priority, efficiency): if the prior checkpoint has a
scan distance >= this one,
- // and <= 50% more than this one and there's no
scanMustReachIndex nor tenuredRanges, don't
- // write a new checkpoint (perhaps split shouldWriteCheckpoint
logic in two)
- scan.update(end, index, ranges, scanLimit, this);
- }
- }
-
- /**
- * We are forbidden from writing a checkpoint nearer than this to a prior
checkpoint.
- * This imposes our linear space complexity bounds, while not harming our
O(log2(N) + K)
- * complexity bounds, as we guarantee minimumSpan is never more than the
number of query
- * results.
- */
- private int minimumSpan()
- {
- return Math.max(scan.goal(), tenured.minimumSpan());
- }
-
- private int maxScanDistance()
- {
- // minimumSpan() reduces overtime, but there is no reason to reduce
our scan distance
- // for tenuring below the scan distance we will write
- return Math.max(scan.watermark(), minimumSpan());
- }
-
- /**
- * The index after the last index we can scan from {@code atIndex} with at
most {@code maxScanDistance}.
- */
- private int scanLimit(int atIndex, int maxScanDistance)
- {
- return Math.min(1 + atIndex + maxScanDistance, ranges.length);
- }
-
- private boolean shouldTenure(RoutingKey end, int scanLimit)
- {
- return scanLimit < ranges.length &&
end.compareTo(ranges[scanLimit].start()) > 0;
- }
-
- private boolean canWriteCheckpoint(int atIndex)
- {
- return atIndex - pending.atIndex >= minimumSpan();
- }
-
- private boolean shouldWriteCheckpoint(int atIndex)
- {
- if (!canWriteCheckpoint(atIndex))
- return false;
-
- // TODO (desired, efficiency): consider these triggers
- if (scan.mustCheckpointToScanTenured(atIndex, maxScanDistance()))
- return true;
-
- return scan.hasMaybeDivergedFromMatchSize(tenured);
- }
-
- /**
- * Write a checkpoint for ranges[prevCheckpointIndex...ri)
- *
- * 1) Finalise the scan distance
- * 2) Write the header
- * 3) Filter the pending tenured ranges to remove those we can scan
- * 4) Write this list out
- * 5) Setup a link to this list, if it is large enough
- * 6) Rollover the scan, tenured and pending structures for the new
pending checkpoint
- */
- private void writeCheckpoint(int nextCheckpointIndex)
- {
- int lastIndex = nextCheckpointIndex - 1;
- int scanDistance = scan.finalise(lastIndex);
- scanDistance = extendScanDistance(lastIndex, scanDistance);
-
- if (pending.atIndex < 0)
- {
- // we don't have any checkpoints pending, so don't try to finalise
it
- // but if the new checkpoint doesn't cover index 0, insert a new
empty
- // checkpoint for the scan distance
- if (nextCheckpointIndex > 0)
- {
- // setup an initial empty checkpoint to store the first scan
distance
- maxScanAndCheckpointMatches = scanDistance;
- writeHeader(scanDistance, 0);
- }
- }
- else
- {
- writeHeader(scanDistance, pending.atIndex);
- int maxCheckpointMatchCount = pending.filter(scanDistance,
lastIndex);
- int listIndex = writeList(pending);
- if (withLinks)
- pending.setupLinkChain(tenured, listIndex, listCount);
- maxScanAndCheckpointMatches =
Math.max(maxScanAndCheckpointMatches, scanDistance + maxCheckpointMatchCount);
- }
-
- savePendingCheckpointAndResetScanDistance(nextCheckpointIndex);
- }
-
- private void savePendingCheckpointAndResetScanDistance(int checkpointIndex)
- {
- // use the tail of checkpointListBuf to buffer ranges we plan to tenure
- ensureCapacity(tenured.count() + scan.watermark());
-
- scan.reset();
-
- if (isAccurate)
- {
- // TODO (low priority, efficiency): we can shift back the existing
scanDistance if it's far enough from
- // the next checkpoint. this might permit us to skip some
comparisons
- scan.resetPeakMax(tenured);
- for (Tenured tenured : this.tenured)
- {
- int distanceToEnd = (tenured.lastIndex - checkpointIndex);
- if (distanceToEnd >= scan.peakMax)
- break;
-
- int scanDistance = tenured.lastIndex - tenured.index;
- if (scanDistance <= scan.peakMax)
- scan.updateScanDistance(tenured.index, scanDistance, null);
- }
-
- if (scan.watermark() < scan.goal)
- {
- int ri = Scan.minScanIndex(checkpointIndex, scan.goal);
- while (ri < checkpointIndex)
- {
- RoutingKey end = ranges[ri].end();
- int scanLimit = scanLimit(ri, scan.peakMax);
- if (!shouldTenure(end, scanLimit))
- scan.update(end, ri, ranges, scanLimit, null);
- ++ri;
- }
- }
- }
- else
- {
- // the maximum scan distance that could ever have been adopted for
last chunk
- int oldPeakMax = scan.peakMax();
- // the minimum scan distance we will start with for processing the
proceeding ranges
- // note: this may increase if we decide to tenure additional
ranges, at which point it will be the actual newPeakMax
- int newMinPeakMax = scan.newPeakMax(tenured);
- int minUntenuredIndex = scan.minUntenuredIndex(checkpointIndex,
tenured);
- int minScanIndex = Scan.minScanIndex(checkpointIndex,
newMinPeakMax);
-
- // we now make sure tenured and scan are correct for the new
parameters.
- // 1) if our peakMax is lower then we need to go back and find
items to tenure that we previously marked for scanning
- // 2) we must also reset our scan distances
-
- // since our peakMax is determined by tenured.count(), but we are
tenuring items here we keep things simple
- // and do not account for those items we tenure but would later
permit to scan as our peakMax grows
-
- int ri = Math.min(minUntenuredIndex, minScanIndex);
- while (ri < checkpointIndex)
- {
- RoutingKey end = ranges[ri].end();
- int newPeakMax = scan.newPeakMax(tenured);
- int scanLimit = scanLimit(ri, newPeakMax);
- if (shouldTenure(end, scanLimit))
- {
- // note: might have already been tenured
- // in this case our untenureLimit may be incorrect, but we
won't use it
- if (ri >= minUntenuredIndex && newPeakMax < oldPeakMax)
- tenured.tenure(end, ri, ranges, scanLimit + 1,
scanLimit(ri, oldPeakMax));
- }
- else
- {
- // this might effectively remove a previously tenured item
- scan.update(end, ri, ranges, scanLimit, null);
- }
- ++ri;
- }
-
- scan.resetPeakMax(tenured);
- }
-
- pending.atIndex = checkpointIndex;
- pending.clear();
- tenured.rollover(pending);
- }
-
- private int extendScanDistance(int lastIndex, int scanDistance)
- {
- // now we've established our lower bound on scan distance, see how
many checkpoints we can remove
- // by increasing our scan distance so that it remains proportional to
the number of results returned
- // TODO (low priority, efficiency): can reduce cost here by using
scanDistances array for upper bounds to scan distance
- int maxScanDistance = scan.goal() + 2 * Math.min(tenured.count(),
tenured.countAtPrevCheckpoint());
- if (maxScanDistance >= 1 + scanDistance + scanDistance/4 &&
pending.count() >= (maxScanDistance - scanDistance)/2)
- {
- int removeCount = 0;
- int extendedScanDistance = scanDistance;
- int target = (maxScanDistance - scanDistance)/2;
- for (int i = 0 ; i < pending.count() ; ++i)
- {
- Tenured t = pending.get(i);
- if (t.index < 0)
- continue;
-
- int distance = Math.min(lastIndex, t.lastIndex) - t.index;
- if (distance <= scanDistance)
- continue; // already scanned or untenured
-
- if (distance <= maxScanDistance)
- {
- ++removeCount;
- extendedScanDistance = Math.max(extendedScanDistance,
distance);
- if (extendedScanDistance == maxScanDistance && removeCount
>= target)
- break;
- }
- }
-
- // TODO (low priority, efficiency): should perhaps also gate this
decision on the span we're covering
- // algorithmically, however, so long as we are under
maxScanDistance we are fine
- if (removeCount >= (extendedScanDistance - scanDistance)/2)
- scanDistance = extendedScanDistance;
- }
- return scanDistance;
- }
-
- int writeList(PendingCheckpoint pending)
+ public static final Accessor<Range[], Range, RoutableKey> RANGE_ACCESSOR =
new Accessor<>()
{
- int startIndex = listCount;
- for (int i = pending.count() - 1 ; i >= 0 ; --i)
- {
- Tenured t = pending.get(i);
- if (t.index >= 0)
- {
- lists[listCount++] = t.index;
- }
- else
- {
- int index = t.index & ~BIT31;
- int length = t.linkLength & ~BIT31;
- if (length <= 0xff && index <= 0xfffff)
- {
- lists[listCount++] = BIT31 | BIT30 | (length << 20) |
index;
- }
- else if (t.linkLength >= 0 && length < BIT30)
- {
- lists[listCount++] = BIT31 | BIT29 | index;
- }
- else
- {
- lists[listCount++] = BIT31 | length;
- lists[listCount++] = BIT31 | pending.count();
- }
- }
- }
- return startIndex;
- }
-
- void writeHeader(int scanDistance, int lowerBound)
- {
- int headerScanDistance = Math.min(scanDistance, MAX_SCAN_DISTANCE);
-
- if ((checkpointCount & 3) == 0)
- headers[headerPointer++] = headerScanDistance;
- else
- headers[headerPointer - (1 + (checkpointCount & 3))] |=
headerScanDistance << (8 * (checkpointCount & 3));
-
- bounds[checkpointCount++] = lowerBound;
- headers[headerPointer++] = listCount;
-
- if (scanDistance >= MAX_SCAN_DISTANCE)
- lists[listCount++] = -scanDistance; // serialize as a negative
value so we ignore it in most cases automatically
- }
-
- void closeHeaders()
- {
- // write our final checkpoint header
- if ((checkpointCount & 3) == 0) headers[headerPointer++] = 0;
- headers[headerPointer++] = listCount;
- }
-
- void ensureCapacity(int maxPendingSize)
- {
- if (listCount + maxPendingSize >= lists.length)
- lists = cachedInts().resize(lists, listCount, lists.length +
lists.length/2 + maxPendingSize);
- }
-
- static class Scan
- {
- /** the scan distance we are aiming for; should be proportional to
log2(N) */
- int goal;
-
- /** the indexes at which we increased the scan distance, and the new
scan distance */
- int[] distances = new int[16];
- /** the number of unique scan distances we have adopted since the last
checkpoint */
- int count;
- /** the highest scan distance we have adopted
(==scanDistance(scanDistanceCount-1)) */
- int watermark;
- /**
- * the first index we have tenured a range from, but for which we did
not immediately write a new checkpoint
- * we *must* scan at least from the last index in the checkpoint to
here
- */
- int scanTenuredAtIndex = -1;
-
- /** The maximum (i.e. initial) scan distance limit we have used since
the last attempted checkpoint write */
- int peakMax;
-
- void init(int goalScanDistance)
- {
- goal = peakMax = goalScanDistance;
- }
-
- private void update(RoutingKey end, int atIndex, Range[] ranges, int
scanLimit, SearchableRangeListBuilder checkpoint)
- {
- int newScanDistance = find(end, atIndex, ranges, scanLimit,
watermark);
- updateScanDistance(atIndex, newScanDistance, checkpoint);
- }
-
- private void updateScanDistance(int atIndex, int newScanDistance,
SearchableRangeListBuilder checkpoint)
- {
- if (newScanDistance > watermark)
- {
- // TODO (desired, efficiency): we don't mind slight increases
to the watermark;
- // should really look at scan distance history and ensure we
haven't e.g. doubled since
- // some earlier point (and should track the match count +
scan distance at each bump
- // to check overall work hasn't increased too much)
- if (checkpoint != null &&
checkpoint.canWriteCheckpoint(atIndex))
- checkpoint.writeCheckpoint(atIndex);
-
- watermark = newScanDistance;
- if (count * 2 == distances.length)
- distances = Arrays.copyOf(distances, distances.length * 2);
- distances[count * 2] = newScanDistance;
- distances[count * 2 + 1] = atIndex;
- ++count;
- }
- }
-
- private int find(RoutingKey end, int atIndex, Range[] ranges, int
scanLimit, int currentScanDistance)
- {
- int lowerIndex = SortedArrays.exponentialSearch(ranges, atIndex +
currentScanDistance, scanLimit, end, (e, s) -> e.compareTo(s.start()), CEIL);
- if (lowerIndex < 0) lowerIndex = -2 - lowerIndex;
- else lowerIndex -= 1;
- return lowerIndex - atIndex;
- }
-
- boolean isAboveGoal()
- {
- return watermark > goal;
- }
-
- int watermark()
- {
- return watermark;
- }
-
- int goal()
- {
- return goal;
- }
-
- int distanceToTenured(int lastIndex)
- {
- return scanTenuredAtIndex >= 0 ? lastIndex - scanTenuredAtIndex :
0;
- }
-
- boolean mustCheckpointToScanTenured(int checkpointIndex, int
maxScanDistance)
- {
- return scanTenuredAtIndex >= 0 && checkpointIndex -
scanTenuredAtIndex >= maxScanDistance;
- }
-
- /**
- * Are we scanning a much longer distance than the minimum number of
matches we know a query will return?
- * Note: with Strategy.FAST, {@code tenured.count()} gets less
accurate as scan distance increases, so this
- * will bounce around triggering checkpoints due to the larger scan
distance, resetting the scan distance
- * and starting again
- */
- boolean hasMaybeDivergedFromMatchSize(TenuredSet tenured)
- {
- return isAboveGoal() && tenured.count() < watermark()/2;
- }
-
- private int distance(int i)
- {
- return distances[i*2];
- }
-
- private int index(int i)
- {
- return distances[i*2+1];
- }
-
- int finalise(int lastIndex)
- {
- Invariants.checkState(distanceToTenured(lastIndex) <=
Math.max(watermark(), peakMax()));
-
- int scanDistance = watermark;
- // then, compute the minimum scan distance implied by any tenured
ranges we did not immediately
- // write a checkpoint for - we *must* scan back as far as this
record
- int minScanDistance = scanTenuredAtIndex >= 0 ? lastIndex -
scanTenuredAtIndex : 0;
- if (minScanDistance > scanDistance)
- {
- // if this minimum is larger than the largest scan distance we
picked up for non-tenured ranges
- // then we are done, as there's nothing we can save
- scanDistance = minScanDistance;
- }
- else if (scanDistance > 0)
- {
- // otherwise, we can look to see if any of the scan distances
we computed overflow the checkpoint,
- // i.e. where no records served by this checkpoint need to
scan the full distance to reach it
- int distanceToLastScanIndex = lastIndex - index(count -1);
- // if the distance to the last scan index is larger than its
scan distance, we have overflowed;
- if (distanceToLastScanIndex < scanDistance)
- {
- minScanDistance = Math.max(distanceToLastScanIndex,
minScanDistance);
- // loop until we find one that doesn't overflow, as this
is another minimum scan distance
- int i = count - 1;
- while (--i >= 0)
- {
- int distance = lastIndex - index(i);
- if (distance >= distance(i)) break;
- else if (distance > minScanDistance) minScanDistance =
distance;
- }
- if (i >= 0) scanDistance = Math.max(minScanDistance,
distance(i));
- else scanDistance = minScanDistance;
- }
- }
-
- return scanDistance;
- }
-
- void reset()
- {
- // we could in theory reset our scan distance using the contents
of scanDistance[]
- // but it's a bit complicated, as we want to have the first item
to increment the scan distance
- // so that we can use it in writeScanDistance to shrink the scan
distance;
- // jumping straight to the highest scan distance breaks this
- count = 0;
- scanTenuredAtIndex = -1;
- watermark = 0;
- }
-
- void resetPeakMax(TenuredSet tenured)
- {
- peakMax = newPeakMax(tenured);
- }
-
- int peakMax()
- {
- return peakMax;
- }
-
- int newPeakMax(TenuredSet tenured)
+ @Override
+ public int size(Range[] ranges)
{
- return Math.max(goal, tenured.count());
+ return ranges.length;
}
- /**
- * The minimum index containing a range that might need to be tenured,
if we have a smaller max scan distance than before
- */
- int minUntenuredIndex(int checkpointIndex, TenuredSet tenured)
+ @Override
+ public Range get(Range[] ranges, int index)
{
- int minUntenuredIndex = Math.max(0, (checkpointIndex - 1) -
watermark());
- // the maximum scan distance that cxould ever have been adopted
for the ranges processed since last checkpoint
- int oldPeakMax = peakMax;
- int newMinPeakMax = newPeakMax(tenured);
- if (newMinPeakMax < oldPeakMax)
- {
- // minimise range we unnecessarily re-tenure over
- // TODO (low priority, efficiency): see if can also use to
reduce range we re-scan e.g. can recycle
- // scanDistances contents if we know we won't need to step
back further at next checkpoint
- for (int i = count - 1; i >= 0 ; --i)
- {
- if (index(i) <= minUntenuredIndex)
- break;
- if (distance(i) <= newMinPeakMax)
- return i + 1 == count ? index(i) : index(i + 1) - 1;
- }
- }
- return minUntenuredIndex;
+ return ranges[index];
}
- /**
- * Record that a range at this index has been tenured, so that we can
track how far back
- * we need to scan to determine how long we can defer writing a new
checkpoint while still
- * being able to scan it.
- *
- * TODO (low priority, efficiency): when a checkpoint is written, we
should consider moving it
- * earlier if the scan distance is increased primarily because of
this index, and the tenured
- * collection is otherwise unchanged (so can be written with minimal
overhead)
- */
- void tenured(int atIndex)
+ @Override
+ public RoutableKey start(Range[] ranges, int index)
{
- if (scanTenuredAtIndex < 0)
- scanTenuredAtIndex = atIndex;
+ return ranges[index].start();
}
- static int minScanIndex(int checkpointIndex, int scanDistance)
+ @Override
+ public RoutableKey start(Range range)
{
- return Math.max(0, (checkpointIndex - 1) - scanDistance);
+ return range.start();
}
@Override
- public String toString()
+ public RoutableKey end(Range[] ranges, int index)
{
- return "Scan{watermark=" + watermark + ", tenured=" +
scanTenuredAtIndex + '}';
+ return ranges[index].end();
}
- }
-
- /**
- * Record-keeping for a range we have decided is not scannable
- */
- static class Tenured implements Comparable<Tenured>
- {
- /**
- * The end of the tenured range covered by the contents referred to be
{@link #index}
- */
- RoutingKey end;
-
- /**
- * <ul>
- * <li>If positive, this points to {@code ranges[index]}</li>
- * <li>If negative, this points to an entry in {@link #lists};
- * see {@link SearchableRangeList#checkpointLists}</li>
- * </ul>
- */
- int index;
-
- /**
- * The last index in {@link #ranges} covered by this tenured range
- */
- int lastIndex;
- /**
- * set when this record is serialized in a checkpoint list to either:
- * <ul>
- * <li>point to itself, in which case no action should be
- * taken on removal (it is only retained for size bookkeeping);
or</li>
- * <li>point to the next item in the checkpoint list; the first
- * such element removed triggers the clearing of the checkpoint
- * list so that its entries are re-inserted in the next
checkpoint</li>
- * </ul>
- */
- Tenured next;
-
- /**
- * Only set for link entries, i.e. where {@code index < 0}.
- * <ul>
- * <li>if positive, the length is optional as we will terminate safely
using the end bound filtering</li>
- * <li>if negative, the low 31 bits <b>must</b> be retrieved as the
length for safe iteration</li>
- * </ul>
- */
- int linkLength;
-
- Tenured(RoutingKey end, int index)
+ @Override
+ public RoutableKey end(Range range)
{
- this.end = end;
- this.index = index;
+ return range.end();
}
@Override
- public int compareTo(@Nonnull Tenured that)
+ public Comparator<RoutableKey> keyComparator()
{
- int c = this.end.compareTo(that.end);
- // we sort indexes in reverse order so later tenured items find
the earlier ones with same end when searching
- // for higher entries for the range of indexes to search, and
- if (c == 0) c = -Integer.compare(this.index, that.index);
- return c;
+ return Comparator.naturalOrder();
}
@Override
- public String toString()
+ public int binarySearch(Range[] ranges, int from, int to, RoutableKey
find, AsymmetricComparator<RoutableKey, Range> comparator, SortedArrays.Search
op)
{
- return "Tenured{end=" + end + ", index=" + index + '}';
+ return SortedArrays.binarySearch(ranges, from, to, find,
comparator, op);
}
- }
+ };
- /**
- * The set of ranges that we intend to write to checkpoints that remain
open at the current point in the iteration
- * This collection may be filtered before serialization, but every member
will be visited either by scanning
- * or visiting the checkpoint list
- * TODO (low priority, efficiency): save garbage by using an
insertion-sorted array for collections where
- * this is sufficient. later, introduce a mutable b-tree supporting
object recycling. we would also like
- * to use a collection that permits us to insert and return a finger into
the tree so we can find the
- * successor as part of insertion, and that permits constant-time first()
calls
- */
- static class TenuredSet extends TreeSet<Tenured>
+ public SearchableRangeListBuilder(Range[] ranges, Strategy strategy, Links
links)
{
- /**
- * the number of direct tenured entries (i.e. ignoring link entries)
- * this is used to provide a minimum bound on the number of results a
range query can return
- * note: with Strategy.FAST this gets less accurate as the span
distance increases
- */
- int directCount;
- int directCountAtPrevCheckpoint;
- int minSpan;
-
- // a stack of recently used EndAndIndex objects - used only for the
duration of a single build
- Tenured reuse, pendingReuse, pendingReuseTail;
-
- int count()
- {
- return directCount;
- }
-
- int countAtPrevCheckpoint()
- {
- return directCountAtPrevCheckpoint;
- }
-
- /**
- * We require a checkpoint to cover a distance at least as large as
the number of tenured ranges leftover
- * since the prior checkpoint, to ensure these require at most linear
additional space, while not requiring
- * more than O(k) additional complexity on search (i.e., we will scan
a number of elements at most equal
- * to the number we have to visit in the checkpoint).
- *
- * We achieve this by recording the minimum number of match results as
of the prior checkpoint (i.e. {@link #count()})
- * and discounting it by one each time we untenure a range, so that
for each tenured range from the prior checkpoint
- * we have either untenured a range or processed at least one
additional input.
- */
- int minimumSpan()
- {
- return minSpan;
- }
-
- private int tenure(RoutingKey end, int index, Range[] ranges, int
minUntenureIndex)
- {
- return tenure(newTenured(end, index), ranges, minUntenureIndex,
ranges.length);
- }
-
- private void tenure(RoutingKey end, int index, Range[] ranges, int
minUntenureIndex, int untenureLimit)
- {
- tenure(newTenured(end, index), ranges, minUntenureIndex,
untenureLimit);
- }
-
- private int tenure(Tenured tenure, Range[] ranges, int
untenureMinIndex, int untenureLimit)
- {
- if (!add(tenure))
- return tenure.lastIndex;
-
- Tenured next = higher(tenure);
- if (next != null)
- untenureLimit = Math.min(untenureLimit, next.lastIndex + 1);
- int untenureIndex = SortedArrays.binarySearch(ranges,
untenureMinIndex, untenureLimit, tenure.end, (e, s) -> e.compareTo(s.start()),
CEIL);
- if (untenureIndex < 0) untenureIndex = -1 - untenureIndex;
- tenure.lastIndex = untenureIndex - 1;
-
Invariants.checkState(tenure.end.compareTo(ranges[tenure.lastIndex].start()) >
0);
- Invariants.checkState(tenure.lastIndex + 1 == ranges.length ||
tenure.end.compareTo(ranges[tenure.lastIndex + 1].start()) <= 0);
- ++directCount;
- return untenureIndex - 1;
- }
-
- private Tenured newTenured(RoutingKey end, int index)
- {
- Tenured result = reuse;
- if (result == null)
- return new Tenured(end, index);
-
- reuse = result.next;
- result.end = end;
- result.index = index;
- result.lastIndex = 0;
- result.next = null;
- return result;
- }
-
- private Tenured addLinkEntry(RoutingKey end, int index, int lastIndex,
int length)
- {
- Invariants.checkArgument(index < 0);
- Tenured result = newTenured(end, index);
- result.linkLength = length;
- result.lastIndex = lastIndex;
- add(result);
- return result;
- }
-
- /**
- * Retire any active tenured ranges that no longer cover the pointer
into ranges;
- * if this crosses our checkpoint threshold, write a new checkpoint.
- */
- void untenure(int index)
- {
- while (!isEmpty() && first().lastIndex < index)
- {
- Tenured removed = pollFirst();
-
- // if removed.next == null, this is not referenced by a link
- // if removed.next == removed, it is referenced by a link but
does not modify the link on removal
- if (removed.next != null && removed.next != removed)
- {
- // this is a member of a link's chain, which may serve one
of two purposes:
- // 1) it may be the entry nominated to invalidate the
link, due to the link
- // membership shrinking below the required threshold;
in which case we
- // must clear the chain to reactivate its members for
insertion into the
- // next checkpoint, and remove the chain link itself
- // 2) it may be nominated as an entry to update the chain
link info, to make
- // it more succinct: if every entry of the chain
remains active, and there
- // are *many* entries then we need two integers to
represent the chain, but
- // as soon as any entry is invalid we can rely on this
entry to terminate
- // iteration, so we update the bookkeeping on the first
entry we remove in
- // this case
-
- // first clear the chain starting at the removed entry
- Tenured prev = removed, next = removed.next;
- while (next.next != null)
- {
- prev = next;
- next = next.next;
- prev.next = null;
- }
- Invariants.checkState(next.index < 0);
- if (prev.end == next.end)
- {
- // if this is the last entry in the link, the link is
expired and should be removed/reused
- remove(next);
- if (pendingReuseTail == null)
- pendingReuseTail = next;
- next.next = pendingReuse;
- pendingReuse = next;
- }
- else if (next.linkLength < 0)
- {
- // otherwise, flag the link as safely consumed without
knowing the length
- next.linkLength = next.linkLength & Integer.MAX_VALUE;
- }
- }
-
- // this was not a link reference; update our bookkeeping and
save it for reuse
- Invariants.checkState(removed.index >= 0);
- --directCount;
- --minSpan;
- if (pendingReuseTail == null)
- pendingReuseTail = removed;
- removed.next = pendingReuse;
- pendingReuse = removed;
- }
- }
-
- /**
- * Write out any direct entries that are not pointed to by a chain
entry, and any chain entries;
- * rollover any per-checkpoint data and free up for reuse discarded
Tenured objects
- */
- void rollover(PendingCheckpoint pending)
- {
- for (Tenured tenured : this)
- {
- if (tenured.next == null)
- pending.add(tenured);
- }
- // make freed Tenured objects available for reuse
- if (pendingReuse != null)
- {
- pendingReuseTail.next = reuse;
- reuse = pendingReuse;
- pendingReuseTail = pendingReuse = null;
- }
- directCountAtPrevCheckpoint = minSpan = directCount;
- }
+ super(RANGE_ACCESSOR, ranges, strategy, links);
}
- /**
- * we write checkpoints out before knowing the scan distance needed for
the range, as a checkpoint precedes
- * the ranges it covers; so we record the position and contents of the
checkpoint, and once the scan distance is
- * known (i.e. when the next checkpoint is written) we re-process the list
to remove items we can now scan before
- * serializing to checkpointListsBuf.
- */
- static class PendingCheckpoint
+ public SearchableRangeListBuilder(Range[] ranges, int goalScanDistance,
Strategy strategy, Links links)
{
- int atIndex = -1;
- int count;
-
- Tenured[] contents = new Tenured[10];
-
- int openDirectCount, firstOpenDirect, openIndirectCount;
- boolean hasClosedDirect;
-
- int count()
- {
- return count;
- }
-
- Tenured get(int i)
- {
- return contents[i];
- }
-
- void add(Tenured tenured)
- {
- if (contents.length == count)
- contents = Arrays.copyOf(contents, 2 * contents.length);
- contents[count++] = tenured;
- }
-
- void clear()
- {
- count = 0;
- }
-
- /**
- * Remove pending entries that will be scanned by the scanDistance,
and update
- * our bookkeeping for creating links
- */
- int filter(int scanDistance, int lastIndex)
- {
- int matchCountModifier = 0;
- int maxi = count;
- count = 0;
- openDirectCount = 0;
- openIndirectCount = 0;
- firstOpenDirect = -1;
-// lastClosedDirect = -1;
-
- for (int i = 0; i < maxi ; ++i)
- {
- Tenured t = get(i);
- if (t.index >= 0)
- {
- if (t.index + scanDistance >= lastIndex)
- continue; // last index will find it with a scan
-
- if (t.lastIndex <= t.index + scanDistance)
- continue; // all indexes will find it with a scan
-
- if (t.lastIndex > lastIndex)
- {
- // this range remains open for the next checkpoint;
- // we may want to reference this list from there
- // so track count and position of first one to make a
determination
- ++openDirectCount;
- if (firstOpenDirect < 0) firstOpenDirect = count;
- }
- else hasClosedDirect = true;
- }
- else
- {
- // note: we over count here, as we count pointers within
the chain
- matchCountModifier += (t.linkLength & Integer.MAX_VALUE) -
1; // (subtract 1 to discount the pointer)
- if (t.lastIndex > lastIndex)
- ++openIndirectCount;
- }
-
- if (i == count) ++count;
- else contents[count++] = t;
- }
-
- return count + matchCountModifier;
- }
-
- /**
- * Setup a link for referencing this chain later, if permitted.
- * Must have at least two items, and at least as many direct records
as indirect
- */
- void setupLinkChain(TenuredSet tenured, int startIndex, int endIndex)
- {
- int minSizeToReference = openIndirectCount +
MIN_INDIRECT_LINK_LENGTH;
- if (openDirectCount >= minSizeToReference)
- {
- int i = firstOpenDirect;
- Tenured prev = get(i++);
-
- while (openDirectCount > minSizeToReference)
- {
- Tenured e = get(i++);
- if (e.index < 0)
- {
- --minSizeToReference;
- continue;
- }
-
- Invariants.checkState(prev.next == null);
- prev.next = prev;
- prev = e;
- --openDirectCount;
- }
-
- while (i < count)
- {
- Tenured next = get(i++);
- if (next.index < 0)
- continue;
-
- Invariants.checkState(prev.next == null);
- prev.next = next;
- prev = next;
- }
-
- // may be more than one entry per item (though usually not)
- int length = endIndex - startIndex;
- Tenured chainEntry = tenured.addLinkEntry(prev.end, BIT31 |
startIndex, prev.lastIndex, length);
- prev.next = chainEntry;
- if (hasClosedDirect && (startIndex > 0xfffff || (length >
0xff)))
- {
- // TODO (expected, testing): make sure this is tested, as
not a common code path (may never be executed in normal operation)
- // we have no closed ranges so iteration needs to know the
end bound, but we cannot encode our bounds cheaply
- // so link the first bound to the chain entry, so that on
removal it triggers an update of endIndex to note
- // that it can be iterated safely without an end bound
- get(firstOpenDirect).next = chainEntry;
- }
- }
- }
+ super(RANGE_ACCESSOR, ranges, goalScanDistance, strategy, links);
+ Invariants.checkArgument(goalScanDistance <= MAX_SCAN_DISTANCE);
+ }
- @Override
- public String toString()
- {
- return Arrays.stream(contents, 0, count)
- .map(Objects::toString)
- .collect(Collectors.joining(",", "[", "]"));
- }
+ @Override
+ public SearchableRangeList build()
+ {
+ return build((ranges, bounds, headers, lists,
maxScanAndCheckpointMatches) ->
+ new SearchableRangeList(ranges, bounds, headers, lists,
maxScanAndCheckpointMatches));
}
}
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index 579bbf9..a18f782 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -831,6 +831,18 @@ public abstract class AsyncChains<V> implements
AsyncChain<V>
}
}
+ public static <V> V getUnchecked(AsyncChain<V> chain)
+ {
+ try
+ {
+ return getUninterruptibly(chain);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
public static void awaitUninterruptibly(AsyncChain<?> chain)
{
try
diff --git a/accord-core/src/main/java/accord/utils/random/Picker.java
b/accord-core/src/main/java/accord/utils/random/Picker.java
index f6f07d7..272f17e 100644
--- a/accord-core/src/main/java/accord/utils/random/Picker.java
+++ b/accord-core/src/main/java/accord/utils/random/Picker.java
@@ -26,6 +26,18 @@ import accord.utils.RandomSource;
public class Picker
{
+ public static float[] randomWeights(RandomSource random, int length)
+ {
+ float[] weights = new float[length - 1];
+ float sum = 0;
+ for (int i = 0 ; i < weights.length ; ++i)
+ weights[i] = sum += random.nextFloat();
+ sum += random.nextFloat();
+ for (int i = 0 ; i < weights.length ; ++i)
+ weights[i] /= sum;
+ return weights;
+ }
+
static abstract class Weighted
{
final RandomSource random;
@@ -33,7 +45,7 @@ public class Picker
public Weighted(RandomSource random, int length)
{
- this(random, randomWeights(random, length));
+ this(random, Picker.randomWeights(random, length));
}
public Weighted(RandomSource random, float[] weights)
@@ -42,17 +54,6 @@ public class Picker
this.weights = weights;
}
- static float[] randomWeights(RandomSource random, int length)
- {
- float[] weights = new float[length - 1];
- float sum = 0;
- for (int i = 0 ; i < weights.length ; ++i)
- weights[i] = sum += random.nextFloat();
- sum += random.nextFloat();
- for (int i = 0 ; i < weights.length ; ++i)
- weights[i] /= sum;
- return weights;
- }
static float[] randomWeights(RandomSource random, float[] bias)
{
@@ -104,7 +105,7 @@ public class Picker
public static <T> WeightedObjectPicker<T> randomWeighted(RandomSource
random, T[] values)
{
- return new WeightedObjectPicker<>(random, values,
randomWeights(random, values.length));
+ return new WeightedObjectPicker<>(random, values,
Picker.randomWeights(random, values.length));
}
public static <T> WeightedObjectPicker<T> randomWeighted(RandomSource
random, T[] values, float[] bias)
diff --git a/accord-core/src/test/java/accord/utils/Gen.java
b/accord-core/src/test/java/accord/utils/Gen.java
index af86340..04b6e63 100644
--- a/accord-core/src/test/java/accord/utils/Gen.java
+++ b/accord-core/src/test/java/accord/utils/Gen.java
@@ -99,6 +99,21 @@ public interface Gen<A> {
return Stream.generate(() -> next(rs));
}
+ interface Int2IntMapFunction
+ {
+ int applyAsInt(RandomSource rs, int value);
+ }
+
+ interface Int2LongMapFunction
+ {
+ long applyAsLong(RandomSource rs, int value);
+ }
+
+ interface Long2LongMapFunction
+ {
+ long applyAsLong(RandomSource rs, long value);
+ }
+
interface IntGen extends Gen<Integer>
{
int nextInt(RandomSource random);
@@ -114,6 +129,16 @@ public interface Gen<A> {
return r -> fn.applyAsInt(nextInt(r));
}
+ default IntGen mapAsInt(Int2IntMapFunction fn)
+ {
+ return r -> fn.applyAsInt(r, nextInt(r));
+ }
+
+ default LongGen mapAsLong(Int2LongMapFunction fn)
+ {
+ return r -> fn.applyAsLong(r, nextInt(r));
+ }
+
default Gen.IntGen filterAsInt(IntPredicate fn)
{
return rs -> {
@@ -159,6 +184,11 @@ public interface Gen<A> {
return r -> fn.applyAsLong(nextLong(r));
}
+ default LongGen mapAsLong(Long2LongMapFunction fn)
+ {
+ return r -> fn.applyAsLong(r, nextLong(r));
+ }
+
default Gen.LongGen filterAsLong(LongPredicate fn)
{
return rs -> {
diff --git a/accord-core/src/test/java/accord/utils/Gens.java
b/accord-core/src/test/java/accord/utils/Gens.java
index 35e4567..3723fc6 100644
--- a/accord-core/src/test/java/accord/utils/Gens.java
+++ b/accord-core/src/test/java/accord/utils/Gens.java
@@ -35,8 +35,12 @@ import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
import java.util.stream.Stream;
+import accord.utils.random.Picker;
+
public class Gens {
private Gens() {
}
@@ -51,6 +55,27 @@ public class Gens {
return ignore -> constant.get();
}
+ public static <T> Gen<T> oneOf(Gen<T>... gens)
+ {
+ return oneOf(Arrays.asList(gens));
+ }
+
+ public static <T> Gen<T> oneOf(List<Gen<T>> gens)
+ {
+ return rs -> rs.pick(gens).next(rs);
+ }
+
+ public static <T> Gen<T> oneOf(Map<Gen<T>, Integer> values)
+ {
+ Gen<Gen<T>> gen = pick(values);
+ return rs -> gen.next(rs).next(rs);
+ }
+
+ public static Gen.IntGen pickInt(int... ts)
+ {
+ return rs -> ts[rs.nextInt(0, ts.length)];
+ }
+
public static <T> Gen<T> pick(T... ts)
{
return pick(Arrays.asList(ts));
@@ -116,6 +141,256 @@ public class Gens {
};
}
+ public static Gen.LongGen pickZipf(long[] array)
+ {
+ if (array == null || array.length == 0)
+ throw new IllegalArgumentException("Empty array given");
+ if (array.length == 1)
+ return ignore -> array[0];
+ BigDecimal[] weights = new BigDecimal[array.length];
+ BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.length));
+ weights[0] = base;
+ for (int i = 1; i < array.length; i++)
+ weights[i] = base.divide(BigDecimal.valueOf(i + 1),
RoundingMode.UP);
+ BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO,
BigDecimal::add);
+
+ return rs -> {
+ BigDecimal value =
BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights);
+ for (int i = 0; i < weights.length; i++)
+ {
+ value = value.subtract(weights[i]);
+ if (value.compareTo(BigDecimal.ZERO) <= 0)
+ return array[i];
+ }
+ return array[array.length - 1];
+ };
+ }
+
+ public static <T> Gen<T> pickZipf(T... array)
+ {
+ return pickZipf(Arrays.asList(array));
+ }
+
+ public static <T> Gen<T> pickZipf(List<T> array)
+ {
+ if (array == null || array.isEmpty())
+ throw new IllegalArgumentException("Empty array given");
+ if (array.size() == 1)
+ return ignore -> array.get(0);
+ BigDecimal[] weights = new BigDecimal[array.size()];
+ BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.size()));
+ weights[0] = base;
+ for (int i = 1; i < array.size(); i++)
+ weights[i] = base.divide(BigDecimal.valueOf(i + 1),
RoundingMode.UP);
+ BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO,
BigDecimal::add);
+
+ return rs -> {
+ BigDecimal value =
BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights);
+ for (int i = 0; i < weights.length; i++)
+ {
+ value = value.subtract(weights[i]);
+ if (value.compareTo(BigDecimal.ZERO) <= 0)
+ return array.get(i);
+ }
+ return array.get(array.size() - 1);
+ };
+ }
+
+ public static Gen<Gen.IntGen> randomWeights(int[] array)
+ {
+ return rs -> {
+ float[] weights = Picker.randomWeights(rs, array.length);
+ return r -> array[index(r, weights)];
+ };
+ }
+
+ public static Gen<Gen.LongGen> randomWeights(long[] array)
+ {
+ return rs -> {
+ float[] weights = Picker.randomWeights(rs, array.length);
+ return r -> array[index(r, weights)];
+ };
+ }
+
+ public static <T> Gen<Gen<T>> randomWeights(T[] array)
+ {
+ return rs -> {
+ float[] weights = Picker.randomWeights(rs, array.length);
+ return r -> array[index(r, weights)];
+ };
+ }
+
+ public static <T> Gen<Gen<T>> randomWeights(List<T> array)
+ {
+ return rs -> {
+ float[] weights = Picker.randomWeights(rs, array.size());
+ return r -> array.get(index(r, weights));
+ };
+ }
+
+ private static int index(RandomSource rs, float[] weights)
+ {
+ int i = Arrays.binarySearch(weights, rs.nextFloat());
+ if (i < 0) i = -1 - i;
+ return i;
+ }
+
+ public static Gen<Gen.IntGen> mixedDistribution(int minInclusive, int
maxExclusive)
+ {
+ int domainSize = (maxExclusive - minInclusive + 1);
+ if (domainSize < 0)
+ throw new IllegalArgumentException("Range is too large; min=" +
minInclusive + ", max=" + maxExclusive);
+ int[] array, indexes;
+ if (domainSize > 200) // randomly selected
+ {
+ int numBuckets = 10;
+ int delta = domainSize / numBuckets;
+ array = new int[numBuckets];
+ for (int i = 0; i < numBuckets; i++)
+ array[i] = minInclusive + i * delta;
+ indexes = IntStream.range(0, array.length).toArray();
+ }
+ else
+ {
+ array = IntStream.range(minInclusive, maxExclusive).toArray();
+ indexes = null;
+ }
+ return rs -> {
+ switch (rs.nextInt(0, 4))
+ {
+ case 0: // uniform
+ return r -> r.nextInt(minInclusive, maxExclusive);
+ case 1: // median biased
+ int median = rs.nextInt(minInclusive, maxExclusive);
+ return r -> r.nextBiasedInt(minInclusive, median,
maxExclusive);
+ case 2: // zipf
+ if (indexes == null)
+ return Gens.pickZipf(rs.nextBoolean() ?
reverseAndCopy(array) : array);
+ return Gens.pickZipf(rs.nextBoolean() ?
reverseAndCopy(indexes) : indexes).mapAsInt((r, index) -> {
+ int start = array[index];
+ int end = index == array.length - 1 ? maxExclusive :
array[index + 1];
+ return r.nextInt(start, end);
+ });
+ case 3: // random weight
+ if (indexes == null)
+ return randomWeights(array).next(rs);
+ return randomWeights(indexes).next(rs).mapAsInt((r, index)
-> {
+ int start = array[index];
+ int end = index == array.length - 1 ? maxExclusive :
array[index + 1];
+ return r.nextInt(start, end);
+ });
+ default:
+ throw new AssertionError();
+ }
+ };
+ }
+
+ private static int[] reverseAndCopy(int[] array)
+ {
+ array = Arrays.copyOf(array, array.length);
+ for (int i = 0, mid = array.length / 2, j = array.length - 1; i < mid;
i++, j--)
+ {
+ int tmp = array[i];
+ array[i] = array[j];
+ array[j] = tmp;
+ }
+ return array;
+ }
+
+ public static Gen<Gen.LongGen> mixedDistribution(long minInclusive, long
maxExclusive)
+ {
+ long domainSize = (maxExclusive - minInclusive + 1);
+ if (domainSize < 0)
+ throw new IllegalArgumentException("Range is too large; min=" +
minInclusive + ", max=" + maxExclusive);
+ long[] array;
+ int[] indexes;
+ if (domainSize > 200) // randomly selected
+ {
+ int numBuckets = 10;
+ long delta = domainSize / numBuckets;
+ array = new long[numBuckets];
+ for (int i = 0; i < numBuckets; i++)
+ array[i] = minInclusive + i * delta;
+ indexes = IntStream.range(0, array.length).toArray();
+ }
+ else
+ {
+ array = LongStream.range(minInclusive, maxExclusive).toArray();
+ indexes = null;
+ }
+ return rs -> {
+ switch (rs.nextInt(0, 4))
+ {
+ case 0: // uniform
+ return r -> r.nextLong(minInclusive, maxExclusive);
+ case 1: // median biased
+ long median = rs.nextLong(minInclusive, maxExclusive);
+ return r -> r.nextBiasedLong(minInclusive, median,
maxExclusive);
+ case 2: // zipf
+ if (indexes == null)
+ return Gens.pickZipf(rs.nextBoolean() ?
reverseAndCopy(array) : array);
+ return Gens.pickZipf(rs.nextBoolean() ?
reverseAndCopy(indexes) : indexes).mapAsLong((r, index) -> {
+ long start = array[index];
+ long end = index == array.length - 1 ? maxExclusive :
array[index + 1];
+ return r.nextLong(start, end);
+ });
+ case 3: // random weight
+ if (indexes == null)
+ return randomWeights(array).next(rs);
+ return randomWeights(indexes).next(rs).mapAsLong((r,
index) -> {
+ long start = array[index];
+ long end = index == array.length - 1 ? maxExclusive :
array[index + 1];
+ return r.nextLong(start, end);
+ });
+ default:
+ throw new AssertionError();
+ }
+ };
+ }
+
+ private static long[] reverseAndCopy(long[] array)
+ {
+ array = Arrays.copyOf(array, array.length);
+ for (int i = 0, mid = array.length / 2, j = array.length - 1; i < mid;
i++, j--)
+ {
+ long tmp = array[i];
+ array[i] = array[j];
+ array[j] = tmp;
+ }
+ return array;
+ }
+
+ public static <T> Gen<Gen<T>> mixedDistribution(T... list)
+ {
+ return mixedDistribution(Arrays.asList(list));
+ }
+
+ public static <T> Gen<Gen<T>> mixedDistribution(List<T> list)
+ {
+ return rs -> {
+ switch (rs.nextInt(0, 4))
+ {
+ case 0: // uniform
+ return r -> list.get(rs.nextInt(0, list.size()));
+ case 1: // median biased
+ int median = rs.nextInt(0, list.size());
+ return r -> list.get(r.nextBiasedInt(0, median,
list.size()));
+ case 2: // zipf
+ List<T> array = list;
+ if (rs.nextBoolean())
+ {
+ array = new ArrayList<>(list);
+ Collections.reverse(array);
+ }
+ return pickZipf(array);
+ case 3: // random weight
+ return randomWeights(list).next(rs);
+ default:
+ throw new AssertionError();
+ }
+ };
+ }
+
public static Gen<char[]> charArray(Gen.IntGen sizes, char[] domain)
{
return charArray(sizes, domain, (a, b) -> true);
@@ -240,6 +515,28 @@ public class Gens {
}
};
}
+
+ public Gen<Gen<Boolean>> mixedDistribution()
+ {
+ return rs -> {
+ int selection = rs.nextInt(0, 4);
+ switch (selection)
+ {
+ case 0: // uniform 50/50
+ return r -> r.nextBoolean();
+ case 1: // variable frequency
+ var freq = rs.nextFloat();
+ return r -> r.decide(freq);
+ case 2: // fixed result
+ boolean result = rs.nextBoolean();
+ return ignore -> result;
+ case 3: // biased repeating runs
+ return biasedRepeatingRuns(rs.nextDouble(),
rs.nextInt(1, 100));
+ default:
+ throw new IllegalStateException("Unexpected int for
bool selection: " + selection);
+ }
+ };
+ }
}
public static class IntDSL
@@ -265,6 +562,11 @@ public class Gens {
return r -> r.nextInt(min, max);
return r -> r.nextInt(min, max + 1);
}
+
+ public Gen<Gen.IntGen> mixedDistribution(int minInclusive, int
maxExclusive)
+ {
+ return Gens.mixedDistribution(minInclusive, maxExclusive);
+ }
}
public static class LongDSL {
@@ -296,6 +598,11 @@ public class Gens {
return pick(klass.getEnumConstants());
}
+ public <T extends Enum<T>> Gen<Gen<T>> allMixedDistribution(Class<T>
klass)
+ {
+ return mixedDistribution(klass.getEnumConstants());
+ }
+
public <T extends Enum<T>> Gen<T> allWithWeights(Class<T> klass,
int... weights)
{
T[] constants = klass.getEnumConstants();
diff --git a/accord-core/src/test/java/accord/utils/Property.java
b/accord-core/src/test/java/accord/utils/Property.java
index 9c81375..50eba0c 100644
--- a/accord-core/src/test/java/accord/utils/Property.java
+++ b/accord-core/src/test/java/accord/utils/Property.java
@@ -23,12 +23,16 @@ import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
public class Property
@@ -75,6 +79,7 @@ public class Property
public T withTimeout(Duration timeout)
{
this.timeout = timeout;
+ this.pure = false;
return (T) this;
}
@@ -168,7 +173,10 @@ public class Property
}
try
{
- return value.toString();
+ String result = value.toString();
+ if (result != null && result.length() > 100 && value instanceof
Collection)
+ result = ((Collection<?>) value).stream().map(o -> "\n\t "
+ o).collect(Collectors.joining(",", "[", "]"));
+ return result;
}
catch (Throwable t)
{
@@ -176,7 +184,7 @@ public class Property
}
}
- private static String propertyError(Common<?> input, Throwable cause,
Object... values)
+ private static StringBuilder propertyErrorCommon(Common<?> input,
Throwable cause)
{
StringBuilder sb = new StringBuilder();
// return "Seed=" + seed + "\nExamples=" + examples;
@@ -194,15 +202,33 @@ public class Property
msg = cause.getClass().getCanonicalName();
sb.append(msg).append('\n');
}
+ return sb;
+ }
+
+ private static String propertyError(Common<?> input, Throwable cause,
Object... values)
+ {
+ StringBuilder sb = propertyErrorCommon(input, cause);
if (values != null)
{
sb.append("Values:\n");
for (int i = 0; i < values.length; i++)
- sb.append('\t').append(i).append(" =
").append(normalizeValue(values[i])).append('\n');
+ sb.append('\t').append(i).append(" =
").append(normalizeValue(values[i])).append(": ").append(values[i] == null ?
"unknown type" : values[i].getClass().getCanonicalName()).append('\n');
}
return sb.toString();
}
+ private static String statefulPropertyError(StatefulBuilder input,
Throwable cause, Object state, List<String> history)
+ {
+ StringBuilder sb = propertyErrorCommon(input, cause);
+ sb.append("Steps: ").append(input.steps).append('\n');
+ sb.append("Values:\n");
+ sb.append("\tState: ").append(state).append(": ").append(state == null
? "unknown type" : state.getClass().getCanonicalName()).append('\n');
+ sb.append("\tHistory:").append('\n');
+ for (var event : history)
+ sb.append("\t\t").append(event).append('\n');
+ return sb.toString();
+ }
+
public interface FailingConsumer<A>
{
void accept(A value) throws Exception;
@@ -380,4 +406,116 @@ public class Property
{
return new ForBuilder();
}
+
+ public static StatefulBuilder stateful()
+ {
+ return new StatefulBuilder();
+ }
+
+ public static class StatefulBuilder extends Common<StatefulBuilder>
+ {
+ protected int steps = 1000;
+
+ public StatefulBuilder()
+ {
+ examples = 500;
+ }
+
+ public StatefulBuilder withSteps(int steps)
+ {
+ this.steps = steps;
+ return this;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public <State, SystemUnderTest> void check(Commands<State,
SystemUnderTest> commands)
+ {
+ RandomSource rs = new DefaultRandom(seed);
+ for (int i = 0; i < examples; i++)
+ {
+ State state = null;
+ List<String> history = new ArrayList<>(steps);
+ try
+ {
+ checkInterrupted();
+
+ state = commands.genInitialState().next(rs);
+ SystemUnderTest sut = commands.createSut(state);
+
+ try
+ {
+ for (int j = 0; j < steps; j++)
+ {
+ Gen<Command<State, SystemUnderTest, ?>> cmdGen =
commands.commands(state);
+ Command cmd = cmdGen.next(rs);
+ for (int a = 0; cmd.checkPreconditions(state) !=
PreCheckResult.Ok && a < 42; a++)
+ {
+ if (a == 41)
+ throw new IllegalArgumentException("Unable
to find next command");
+ cmd = cmdGen.next(rs);
+ }
+ history.add(cmd.detailed(state));
+ Object stateResult = cmd.apply(state);
+ cmd.checkPostconditions(state, stateResult,
+ sut, cmd.run(sut));
+ }
+ }
+ finally
+ {
+ commands.destroySut(sut);
+ commands.destroyState(state);
+ }
+ }
+ catch (Throwable t)
+ {
+ throw new PropertyError(statefulPropertyError(this, t,
state, history), t);
+ }
+ if (pure)
+ {
+ seed = rs.nextLong();
+ rs.setSeed(seed);
+ }
+ }
+ }
+ }
+
+ public enum PreCheckResult { Ok, Ignore }
+ public interface Command<State, SystemUnderTest, Result>
+ {
+ default PreCheckResult checkPreconditions(State state) {return
PreCheckResult.Ok;}
+ Result apply(State state) throws Throwable;
+ Result run(SystemUnderTest sut) throws Throwable;
+ default void checkPostconditions(State state, Result expected,
+ SystemUnderTest sut, Result actual)
throws Throwable {}
+ default String detailed(State state) {return this.toString();}
+ }
+
+ public interface UnitCommand<State, SystemUnderTest> extends
Command<State, SystemUnderTest, Void>
+ {
+ void applyUnit(State state) throws Throwable;
+ void runUnit(SystemUnderTest sut) throws Throwable;
+
+ @Override
+ default Void apply(State state) throws Throwable
+ {
+ applyUnit(state);
+ return null;
+ }
+
+ @Override
+ default Void run(SystemUnderTest sut) throws Throwable
+ {
+ runUnit(sut);
+ return null;
+ }
+ }
+
+ public interface Commands<State, SystemUnderTest>
+ {
+ Gen<State> genInitialState() throws Throwable;
+ SystemUnderTest createSut(State state) throws Throwable;
+ default void destroyState(State state) throws Throwable {}
+ default void destroySut(SystemUnderTest sut) throws Throwable {}
+ Gen<Command<State, SystemUnderTest, ?>> commands(State state) throws
Throwable;
+ }
}
diff --git
a/accord-core/src/test/java/accord/utils/SearchableRangeListTest.java
b/accord-core/src/test/java/accord/utils/SearchableRangeListTest.java
new file mode 100644
index 0000000..d31c95c
--- /dev/null
+++ b/accord-core/src/test/java/accord/utils/SearchableRangeListTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+import org.junit.jupiter.api.Test;
+
+import accord.impl.IntKey;
+import accord.primitives.Range;
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.Property.qt;
+
+class SearchableRangeListTest
+{
+ @Test
+ public void fullWorld()
+ {
+ int numRanges = 1000;
+ List<Range> ranges = new ArrayList<>(numRanges);
+ for (int i = 0; i < numRanges; i++)
+ ranges.add(IntKey.range(i, i + 1));
+
+ SearchableRangeList list =
SearchableRangeList.build(ranges.toArray(new Range[0]));
+ class Counter { int value;}
+ BiConsumer<Integer, Integer> test = (rangeStart, rangeEnd) -> {
+ Counter counter = new Counter();
+ list.forEach(IntKey.range(rangeStart, rangeEnd), (a, b, c, d, e)
-> {
+ counter.value++;
+ }, (a, b, c, d, start, end) -> {
+ counter.value += (end - start + 1);
+ }, 0, 0, 0, 0, 0);
+ Assertions.assertThat(counter.value).isEqualTo(rangeEnd -
rangeStart + 1);
+ };
+ for (int i = 0; i < numRanges; i++)
+ test.accept(i, numRanges);
+ for (int i = 0; i < numRanges; i++)
+ test.accept(0, numRanges - i);
+ }
+
+ @Test
+ public void random()
+ {
+ qt().check(rs -> {
+ int numRanges = rs.nextInt(1000, 10000);
+ List<Range> ranges = new ArrayList<>(numRanges);
+ for (int i = 0; i < numRanges; i++)
+ {
+ int start = rs.nextInt(Integer.MIN_VALUE, Integer.MAX_VALUE -
1000);
+ int offset = rs.nextInt(1, 1000);
+ ranges.add(IntKey.range(start, start + offset));
+ }
+ ranges.sort(Comparator.comparing(Range::start));
+
+ SearchableRangeList list =
SearchableRangeList.build(ranges.toArray(new Range[0]));
+ for (int i = 0; i < 1000; i++)
+ {
+ Range range;
+ int selection = rs.nextInt(0, 3);
+ switch (selection)
+ {
+ case 0:
+ range = rs.pick(ranges);
+ break;
+ case 1:
+ int rangeStart = rs.nextInt(Integer.MIN_VALUE,
Integer.MAX_VALUE - 1000);
+ int offset = rs.nextInt(1, 1000);
+ range = IntKey.range(rangeStart, rangeStart + offset);
+ break;
+ case 2:
+ int start = rs.nextInt(0, ranges.size());
+ int end = start + rs.nextInt(0, (ranges.size() -
start));
+ range = IntKey.range(((IntKey)
ranges.get(start).start()).key, ((IntKey) ranges.get(end).end()).key);
+ break;
+ default:
+ throw new IllegalStateException("Unhandled value");
+ }
+ List<Range> expected = new ArrayList<>();
+ for (Range r : ranges)
+ {
+ if (range.compareIntersecting(r) == 0)
+ expected.add(r);
+ }
+ List<Range> actual = new ArrayList<>(expected.size());
+ list.forEach(range, (a, b, c, d, idx) -> {
+ actual.add(list.ranges[idx]);
+ }, (a, b, c, d, start, end) -> {
+ for (int j = start; j < end; j++)
+ actual.add(list.ranges[j]);
+ }, 0, 0, 0, 0, 0);
+
+ Assertions.assertThat(actual).isEqualTo(expected);
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/buildSrc/src/main/groovy/accord.java-conventions.gradle
b/buildSrc/src/main/groovy/accord.java-conventions.gradle
index f77a0c0..32cd21b 100644
--- a/buildSrc/src/main/groovy/accord.java-conventions.gradle
+++ b/buildSrc/src/main/groovy/accord.java-conventions.gradle
@@ -29,7 +29,7 @@ repositories {
}
compileJava {
- sourceCompatibility = JavaVersion.VERSION_1_8
+ sourceCompatibility = JavaVersion.VERSION_11
dependsOn(':rat')
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]