http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 69520b0..a9f455a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -135,6 +135,7 @@ import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ReadOnlyTableException; import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.Sequence; +import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; @@ -2272,8 +2273,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement * Verifies that sequences exist and reserves values for them if reserveValues is true */ @Override - public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException { - incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, action); + public void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException { + incrementSequenceValues(sequenceAllocations, timestamp, values, exceptions, action); } /** @@ -2286,14 +2287,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement * */ @Override - public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException { - incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, Sequence.ValueOp.INCREMENT_SEQUENCE); + public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions) throws SQLException { + incrementSequenceValues(sequenceAllocations, timestamp, values, exceptions, Sequence.ValueOp.INCREMENT_SEQUENCE); } @SuppressWarnings("deprecation") - private void incrementSequenceValues(List<SequenceKey> keys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp op) throws SQLException { - List<Sequence> sequences = Lists.newArrayListWithExpectedSize(keys.size()); - for (SequenceKey key : keys) { + private void incrementSequenceValues(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp op) throws SQLException { + List<Sequence> sequences = Lists.newArrayListWithExpectedSize(sequenceAllocations.size()); + for (SequenceAllocation sequenceAllocation : sequenceAllocations) { + SequenceKey key = sequenceAllocation.getSequenceKey(); Sequence newSequences = new Sequence(key); Sequence sequence = sequenceMap.putIfAbsent(key, newSequences); if (sequence == null) { @@ -2312,11 +2314,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement for (int i = 0; i < sequences.size(); i++) { Sequence sequence = sequences.get(i); try { - values[i] = sequence.incrementValue(timestamp, op); + values[i] = sequence.incrementValue(timestamp, op, sequenceAllocations.get(i).getNumAllocations()); } catch (EmptySequenceCacheException e) { indexes[toIncrementList.size()] = i; toIncrementList.add(sequence); - Increment inc = sequence.newIncrement(timestamp, op); + Increment inc = sequence.newIncrement(timestamp, op, sequenceAllocations.get(i).getNumAllocations()); incrementBatch.add(inc); } catch (SQLException e) { exceptions[i] = e; @@ -2355,7 +2357,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement Sequence sequence = toIncrementList.get(i); Result result = (Result)resultObjects[i]; try { - values[indexes[i]] = sequence.incrementValue(result, op); + long numToAllocate = Bytes.toLong(incrementBatch.get(i).getAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE)); + values[indexes[i]] = sequence.incrementValue(result, op, numToAllocate); } catch (SQLException e) { exceptions[indexes[i]] = e; }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 4d582be..3fa0c1e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -66,6 +66,7 @@ import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.Sequence; +import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceAlreadyExistsException; import org.apache.phoenix.schema.SequenceInfo; import org.apache.phoenix.schema.SequenceKey; @@ -385,13 +386,13 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple } @Override - public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, - SQLException[] exceptions, Sequence.ValueOp action) throws SQLException { + public void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, + long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException { int i = 0; - for (SequenceKey key : sequenceKeys) { - SequenceInfo info = sequenceMap.get(key); + for (SequenceAllocation sequenceAllocation : sequenceAllocations) { + SequenceInfo info = sequenceMap.get(sequenceAllocation.getSequenceKey()); if (info == null) { - exceptions[i] = new SequenceNotFoundException(key.getSchemaName(), key.getSequenceName()); + exceptions[i] = new SequenceNotFoundException(sequenceAllocation.getSequenceKey().getSchemaName(), sequenceAllocation.getSequenceKey().getSequenceName()); } else { values[i] = info.sequenceValue; } @@ -400,10 +401,11 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple } @Override - public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, - SQLException[] exceptions) throws SQLException { + public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, + long[] values, SQLException[] exceptions) throws SQLException { int i = 0; - for (SequenceKey key : sequenceKeys) { + for (SequenceAllocation sequenceAllocation : sequenceAllocations) { + SequenceKey key = sequenceAllocation.getSequenceKey(); SequenceInfo info = sequenceMap.get(key); if (info == null) { exceptions[i] = new SequenceNotFoundException( @@ -429,7 +431,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple i = 0; for (SQLException e : exceptions) { if (e != null) { - sequenceMap.remove(sequenceKeys.get(i)); + sequenceMap.remove(sequenceAllocations.get(i).getSequenceKey()); } i++; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 2a98cd5..4153652 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -42,6 +42,7 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.Sequence; +import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.stats.PTableStats; @@ -183,15 +184,15 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple } @Override - public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, - SQLException[] exceptions, Sequence.ValueOp action) throws SQLException { - getDelegate().validateSequences(sequenceKeys, timestamp, values, exceptions, action); + public void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, + long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException { + getDelegate().validateSequences(sequenceAllocations, timestamp, values, exceptions, action); } @Override - public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, - SQLException[] exceptions) throws SQLException { - getDelegate().incrementSequences(sequenceKeys, timestamp, values, exceptions); + public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, + long[] values, SQLException[] exceptions) throws SQLException { + getDelegate().incrementSequences(sequenceAllocations, timestamp, values, exceptions); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 9e74d2a..9d0c1aa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1255,7 +1255,7 @@ public class MetaDataClient { dataTable.getTimeStamp()); long[] seqValues = new long[1]; SQLException[] sqlExceptions = new SQLException[1]; - connection.getQueryServices().incrementSequences(Collections.singletonList(key), + connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)), Math.max(timestamp, dataTable.getTimeStamp()), seqValues, sqlExceptions); if (sqlExceptions[0] != null) { throw sqlExceptions[0]; http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java index aeba58b..adca5e8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java @@ -62,7 +62,7 @@ import com.google.common.math.LongMath; public class Sequence { public static final int SUCCESS = 0; - public enum ValueOp {VALIDATE_SEQUENCE, RESERVE_SEQUENCE, INCREMENT_SEQUENCE}; + public enum ValueOp {VALIDATE_SEQUENCE, INCREMENT_SEQUENCE}; public enum MetaOp {CREATE_SEQUENCE, DROP_SEQUENCE, RETURN_SEQUENCE}; // create empty Sequence key values used while created a sequence row @@ -144,7 +144,7 @@ public class Sequence { return value.isDeleted ? null : value; } - private long increment(SequenceValue value, ValueOp op) throws SQLException { + private long increment(SequenceValue value, ValueOp op, long numToAllocate) throws SQLException { boolean increasingSeq = value.incrementBy > 0 && op != ValueOp.VALIDATE_SEQUENCE; // check if the the sequence has already reached the min/max limit if (value.limitReached && op != ValueOp.VALIDATE_SEQUENCE) { @@ -165,7 +165,8 @@ public class Sequence { boolean overflowOrUnderflow=false; // advance currentValue while checking for overflow try { - value.currentValue = LongMath.checkedAdd(value.currentValue, value.incrementBy); + // advance by numToAllocate * the increment amount + value.currentValue = LongMath.checkedAdd(value.currentValue, numToAllocate * value.incrementBy); } catch (ArithmeticException e) { overflowOrUnderflow = true; } @@ -180,18 +181,92 @@ public class Sequence { return returnValue; } - public long incrementValue(long timestamp, ValueOp op) throws SQLException { + public long incrementValue(long timestamp, ValueOp op, long numToAllocate) throws SQLException { SequenceValue value = findSequenceValue(timestamp); if (value == null) { throw EMPTY_SEQUENCE_CACHE_EXCEPTION; } - if (value.currentValue == value.nextValue) { + + if (isSequenceCacheExhausted(numToAllocate, value)) { if (op == ValueOp.VALIDATE_SEQUENCE) { return value.currentValue; } throw EMPTY_SEQUENCE_CACHE_EXCEPTION; - } - return increment(value, op); + } + return increment(value, op, numToAllocate); + } + + /** + * This method first checks whether value.currentValue = value.nextValue, this check is what + * determines whether we need to refresh the cache when evaluating NEXT VALUE FOR. Once + * current value reaches the next value we know the cache is exhausted as we give sequence + * values out one at time. + * + * However for bulk allocations, evaluated by NEXT <n> VALUE FOR, we need a different check + * @see isSequenceCacheExhaustedForBulkAllocation + * + * Using the bulk allocation method for determining if the cache is exhausted for both cases + * works in most of the cases, however when dealing with CYCLEs and overflow and underflow, things + * break down due to things like sign changes that can happen if we overflow from a positive to + * a negative number and vice versa. Therefore, leaving both checks in place. + * + */ + private boolean isSequenceCacheExhausted(final long numToAllocate, final SequenceValue value) throws SQLException { + return value.currentValue == value.nextValue || (SequenceUtil.isBulkAllocation(numToAllocate) && isSequenceCacheExhaustedForBulkAllocation(numToAllocate, value)); + } + + /** + * This method checks whether there are sufficient values in the SequenceValue + * cached on the client to allocate the requested number of slots. It handles + * decreasing and increasing sequences as well as any overflows or underflows + * encountered. + */ + private boolean isSequenceCacheExhaustedForBulkAllocation(final long numToAllocate, final SequenceValue value) throws SQLException { + long targetSequenceValue; + + performValidationForBulkAllocation(numToAllocate, value); + + try { + targetSequenceValue = LongMath.checkedAdd(value.currentValue, numToAllocate * value.incrementBy); + } catch (ArithmeticException e) { + // Perform a CheckedAdd to make sure if over/underflow + // We don't treat this as the cache being exhausted as the current value may be valid in the case + // of no cycle, logic in increment() will take care of detecting we've hit the limit of the sequence + return false; + } + + if (value.incrementBy > 0) { + return targetSequenceValue > value.nextValue; + } else { + return targetSequenceValue < value.nextValue; + } + } + + /** + * @throws SQLException with the correct error code if sequence limit is reached with + * this request for allocation or we attempt to perform a bulk allocation on a sequence + * with cycles. + */ + private void performValidationForBulkAllocation(final long numToAllocate, final SequenceValue value) + throws SQLException { + boolean increasingSeq = value.incrementBy > 0 ? true : false; + + // We don't support Bulk Allocations on sequences that have the CYCLE flag set to true + // Check for this here so we fail on expression evaluation and don't allow corner case + // whereby a client requests less than cached number of slots on sequence with cycle to succeed + if (value.cycle && !SequenceUtil.isCycleAllowed(numToAllocate)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED) + .setSchemaName(key.getSchemaName()) + .setTableName(key.getSequenceName()) + .build().buildException(); + } + + if (SequenceUtil.checkIfLimitReached(value.currentValue, value.minValue, value.maxValue, value.incrementBy, value.cacheSize, numToAllocate)) { + throw new SQLExceptionInfo.Builder(SequenceUtil.getLimitReachedErrorCode(increasingSeq)) + .setSchemaName(key.getSchemaName()) + .setTableName(key.getSequenceName()) + .build().buildException(); + } } public List<Append> newReturns() { @@ -249,7 +324,7 @@ public class Sequence { return key; } - public long incrementValue(Result result, ValueOp op) throws SQLException { + public long incrementValue(Result result, ValueOp op, long numToAllocate) throws SQLException { // In this case, we don't definitely know the timestamp of the deleted sequence, // but we know anything older is likely deleted. Worse case, we remove a sequence // from the cache that we shouldn't have which will cause a gap in sequence values. @@ -270,19 +345,21 @@ public class Sequence { .build().buildException(); } // If we found the sequence, we update our cache with the new value - SequenceValue value = new SequenceValue(result, op); + SequenceValue value = new SequenceValue(result, op, numToAllocate); insertSequenceValue(value); - return increment(value, op); + return increment(value, op, numToAllocate); } + @SuppressWarnings("deprecation") - public Increment newIncrement(long timestamp, Sequence.ValueOp action) { + public Increment newIncrement(long timestamp, Sequence.ValueOp action, long numToAllocate) { Increment inc = new Increment(key.getKey()); // It doesn't matter what we set the amount too - we always use the values we get // from the Get we do to prevent any race conditions. All columns that get added // are returned with their current value try { inc.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, timestamp); + inc.setAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE, Bytes.toBytes(numToAllocate)); } catch (IOException e) { throw new RuntimeException(e); // Impossible } @@ -413,7 +490,7 @@ public class Sequence { return this.incrementBy == 0; } - public SequenceValue(Result r, ValueOp op) { + public SequenceValue(Result r, ValueOp op, long numToAllocate) { KeyValue currentValueKV = getCurrentValueKV(r); KeyValue incrementByKV = getIncrementByKV(r); KeyValue cacheSizeKV = getCacheSizeKV(r); @@ -429,8 +506,12 @@ public class Sequence { this.cycle = (Boolean) PBoolean.INSTANCE.toObject(cycleKV.getValueArray(), cycleKV.getValueOffset(), cycleKV.getValueLength()); this.limitReached = false; currentValue = nextValue; + if (op != ValueOp.VALIDATE_SEQUENCE) { - currentValue -= incrementBy * cacheSize; + // We can't just take the max of numToAllocate and cacheSize + // We need to handle a valid edgecase where a client requests bulk allocation of + // a number of slots that are less than cache size of the sequence + currentValue -= incrementBy * (SequenceUtil.isBulkAllocation(numToAllocate) ? numToAllocate : cacheSize); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java new file mode 100644 index 0000000..afb4a20 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java @@ -0,0 +1,48 @@ +package org.apache.phoenix.schema; + +/** + * A SequenceKey and the number of slots requested to be allocated for the sequence. + * It binds these two together to allow operations such as sorting + * a Collection of SequenceKeys and at the same time preserving the associated requested + * number of slots to allocate. + * + * This class delegates hashCode, equals and compareTo to @see{SequenceKey}. + * + */ +public class SequenceAllocation implements Comparable<SequenceAllocation> { + + private final SequenceKey sequenceKey; + private final long numAllocations; + + public SequenceAllocation(SequenceKey sequenceKey, long numAllocations) { + this.sequenceKey = sequenceKey; + this.numAllocations = numAllocations; + } + + + public SequenceKey getSequenceKey() { + return sequenceKey; + } + + + public long getNumAllocations() { + return numAllocations; + } + + + @Override + public int hashCode() { + return sequenceKey.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return sequenceKey.equals(obj); + } + + @Override + public int compareTo(SequenceAllocation that) { + return sequenceKey.compareTo(that.sequenceKey); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java index f97d565..acf1864 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java @@ -16,6 +16,7 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.schema.SequenceInfo; +import com.google.common.base.Preconditions; import com.google.common.math.LongMath; /** @@ -23,17 +24,24 @@ import com.google.common.math.LongMath; */ public class SequenceUtil { + public static final long DEFAULT_NUM_SLOTS_TO_ALLOCATE = 1L; + /** - * Returns the nextValue of a sequence - * @throws SQLException if cycle is false and the sequence limit has been reached + * @return true if we limit of a sequence has been reached. */ public static boolean checkIfLimitReached(long currentValue, long minValue, long maxValue, - long incrementBy, long cacheSize) throws SQLException { + long incrementBy, long cacheSize, long numToAllocate) { long nextValue = 0; boolean increasingSeq = incrementBy > 0 ? true : false; // advance currentValue while checking for overflow try { - long incrementValue = LongMath.checkedMultiply(incrementBy, cacheSize); + long incrementValue; + if (isBulkAllocation(numToAllocate)) { + // For bulk allocation we increment independent of cache size + incrementValue = LongMath.checkedMultiply(incrementBy, numToAllocate); + } else { + incrementValue = LongMath.checkedMultiply(incrementBy, cacheSize); + } nextValue = LongMath.checkedAdd(currentValue, incrementValue); } catch (ArithmeticException e) { return true; @@ -46,9 +54,28 @@ public class SequenceUtil { } return false; } + + public static boolean checkIfLimitReached(long currentValue, long minValue, long maxValue, + long incrementBy, long cacheSize) throws SQLException { + return checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize, DEFAULT_NUM_SLOTS_TO_ALLOCATE); + } public static boolean checkIfLimitReached(SequenceInfo info) throws SQLException { - return checkIfLimitReached(info.sequenceValue, info.minValue, info.maxValue, info.incrementBy, info.cacheSize); + return checkIfLimitReached(info.sequenceValue, info.minValue, info.maxValue, info.incrementBy, info.cacheSize, DEFAULT_NUM_SLOTS_TO_ALLOCATE); + } + + /** + * Returns true if the value of numToAllocate signals that a bulk allocation of sequence slots + * was requested. Prevents proliferation of same comparison in many places throughout the code. + */ + public static boolean isBulkAllocation(long numToAllocate) { + Preconditions.checkArgument(numToAllocate > 0); + return numToAllocate > DEFAULT_NUM_SLOTS_TO_ALLOCATE; + } + + public static boolean isCycleAllowed(long numToAllocate) { + return !isBulkAllocation(numToAllocate); + } /** @@ -59,5 +86,15 @@ public class SequenceUtil { return new SQLExceptionInfo.Builder(code).setSchemaName(schemaName).setTableName(tableName) .build().buildException(); } + + /** + * Returns the correct instance of SQLExceptionCode when we detect a limit has been reached, + * depending upon whether a min or max value caused the limit to be exceeded. + */ + public static SQLExceptionCode getLimitReachedErrorCode(boolean increasingSeq) { + SQLExceptionCode code = increasingSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE + : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE; + return code; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java new file mode 100644 index 0000000..4a825f2 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java @@ -0,0 +1,59 @@ +package org.apache.phoenix.schema; + +import static org.junit.Assert.assertEquals; + +import java.util.Collections; +import java.util.List; + +import org.apache.phoenix.query.QueryServicesOptions; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class SequenceAllocationTest { + + @Test + /** + * Validates that sorting a List of SequenceAllocation instances + * results in the same sort order as sorting SequenceKey instances. + */ + public void testSortingSequenceAllocation() { + + // Arrange + SequenceKey sequenceKey1 = new SequenceKey(null, "seqalloc", "sequenceC",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); + SequenceKey sequenceKey2 = new SequenceKey(null, "seqalloc", "sequenceB",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); + SequenceKey sequenceKey3 = new SequenceKey(null, "seqalloc", "sequenceA",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); + List<SequenceKey> sequenceKeys = Lists.newArrayList(sequenceKey1, sequenceKey2, sequenceKey3); + List<SequenceAllocation> sequenceAllocations = Lists.newArrayList(new SequenceAllocation(sequenceKey2, 1), new SequenceAllocation(sequenceKey1, 1), new SequenceAllocation(sequenceKey3, 1)); + + // Act + Collections.sort(sequenceKeys); + Collections.sort(sequenceAllocations); + + // Assert + int i = 0; + for (SequenceKey sequenceKey : sequenceKeys) { + assertEquals(sequenceKey, sequenceAllocations.get(i).getSequenceKey()); + i++; + } + } + + @Test + public void testSortingSequenceAllocationPreservesAllocations() { + + // Arrange + SequenceKey sequenceKeyC = new SequenceKey(null, "seqalloc", "sequenceC",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); + SequenceKey sequenceKeyB = new SequenceKey(null, "seqalloc", "sequenceB",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); + SequenceKey sequenceKeyA = new SequenceKey(null, "seqalloc", "sequenceA",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); + List<SequenceAllocation> sequenceAllocations = Lists.newArrayList(new SequenceAllocation(sequenceKeyB, 15), new SequenceAllocation(sequenceKeyC, 11), new SequenceAllocation(sequenceKeyA, 1000)); + + // Act + Collections.sort(sequenceAllocations); + + // Assert + assertEquals("sequenceA",sequenceAllocations.get(0).getSequenceKey().getSequenceName()); + assertEquals(1000,sequenceAllocations.get(0).getNumAllocations()); + assertEquals(15,sequenceAllocations.get(1).getNumAllocations()); + assertEquals(11,sequenceAllocations.get(2).getNumAllocations()); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java index f25a213..2abc482 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java @@ -62,4 +62,58 @@ public class SequenceUtilTest { public void testDescendingOverflowCycle() throws SQLException { assertTrue(SequenceUtil.checkIfLimitReached(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/* incrementBy */, CACHE_SIZE)); } + + @Test + public void testBulkAllocationAscendingNextValueGreaterThanMax() throws SQLException { + assertTrue(SequenceUtil.checkIfLimitReached(MAX_VALUE, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE, 1)); + } + + @Test + public void testBulkAllocationAscendingNextValueReachLimit() throws SQLException { + assertFalse(SequenceUtil.checkIfLimitReached(6, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE, 2)); + } + + @Test + public void testBulkAllocationAscendingNextValueWithinLimit() throws SQLException { + assertFalse(SequenceUtil.checkIfLimitReached(5, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE, 2)); + + } + + @Test + public void testBulkAllocationAscendingOverflow() throws SQLException { + assertTrue(SequenceUtil.checkIfLimitReached(Long.MAX_VALUE, 0, Long.MAX_VALUE, 1/* incrementBy */, CACHE_SIZE, 100)); + } + + + @Test + public void testBulkAllocationDescendingNextValueLessThanMax() throws SQLException { + assertTrue(SequenceUtil.checkIfLimitReached(10, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE, 5)); + } + + @Test + public void testBulkAllocationDescendingNextValueReachLimit() throws SQLException { + assertFalse(SequenceUtil.checkIfLimitReached(7, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE, 3)); + } + + @Test + public void testBulkAllocationDescendingNextValueWithinLimit() throws SQLException { + assertFalse(SequenceUtil.checkIfLimitReached(8, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE, 2)); + + } + + @Test + public void testBulkAllocationDescendingOverflowCycle() throws SQLException { + assertTrue(SequenceUtil.checkIfLimitReached(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/* incrementBy */, CACHE_SIZE, 100)); + } + + @Test + public void testIsCycleAllowedForBulkAllocation() { + assertFalse(SequenceUtil.isCycleAllowed(2)); + } + + @Test + public void testIsCycleAllowedForStandardAllocation() { + assertTrue(SequenceUtil.isCycleAllowed(1)); + } + }
