Repository: phoenix Updated Branches: refs/heads/3.0 ef27a9f56 -> 7cc2b5a54
PHOENIX-1016 Support MINVALUE, MAXVALUE, and CYCLE options in CREATE SEQUENCE (Thomas D'Silva) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7cc2b5a5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7cc2b5a5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7cc2b5a5 Branch: refs/heads/3.0 Commit: 7cc2b5a54e98b7db990c2b81bcc269c49509ce26 Parents: ef27a9f Author: James Taylor <jtay...@salesforce.com> Authored: Mon Jul 28 19:46:50 2014 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Mon Jul 28 19:46:50 2014 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/SequenceIT.java | 19 +- .../phoenix/coprocessor/MetaDataProtocol.java | 2 +- .../coprocessor/SequenceRegionObserver.java | 223 +++++++++++-------- ...SkipRangeParallelIteratorRegionSplitter.java | 8 +- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 4 +- .../query/ConnectionQueryServicesImpl.java | 4 +- .../query/ConnectionlessQueryServicesImpl.java | 38 +++- .../apache/phoenix/query/QueryConstants.java | 4 +- .../org/apache/phoenix/schema/Sequence.java | 85 ++++--- .../org/apache/phoenix/schema/SequenceInfo.java | 2 + .../org/apache/phoenix/util/KeyValueUtil.java | 8 - .../org/apache/phoenix/util/SequenceUtil.java | 29 +-- .../apache/phoenix/util/SequenceUtilTest.java | 78 ++----- 13 files changed, 262 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java index 0208d17..78f19a7 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java @@ -43,6 +43,7 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SequenceUtil; +import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -55,7 +56,7 @@ import com.google.common.collect.Maps; public class SequenceIT extends BaseClientManagedTimeIT { private static final String NEXT_VAL_SQL = "SELECT NEXT VALUE FOR foo.bar FROM SYSTEM.\"SEQUENCE\""; private static final long BATCH_SIZE = 3; - + private Connection conn; @BeforeClass @@ -69,6 +70,13 @@ public class SequenceIT extends BaseClientManagedTimeIT { setUpTestDriver(getUrl(), new ReadOnlyProps(props.entrySet().iterator())); } + @After + public void tearDown() throws Exception { + // close any open connection between tests, so that connections are not leaked + if (conn != null) { + conn.close(); + } + } @Test public void testSystemTable() throws Exception { @@ -114,7 +122,7 @@ public class SequenceIT extends BaseClientManagedTimeIT { assertTrue(rs.next()); assertEquals("ALPHA", rs.getString("sequence_schema")); assertEquals("OMEGA", rs.getString("sequence_name")); - assertEquals(null, rs.getBytes("current_value")); + assertEquals(2, rs.getInt("current_value")); assertEquals(4, rs.getInt("increment_by")); assertFalse(rs.next()); } @@ -152,7 +160,7 @@ public class SequenceIT extends BaseClientManagedTimeIT { assertTrue(rs.next()); assertEquals("ALPHA", rs.getString("sequence_schema")); assertEquals("OMEGA", rs.getString("sequence_name")); - assertEquals(null, rs.getBytes("current_value")); + assertEquals(2, rs.getInt("current_value")); assertEquals(4, rs.getInt("increment_by")); assertFalse(rs.next()); @@ -208,7 +216,7 @@ public class SequenceIT extends BaseClientManagedTimeIT { "SELECT start_with, current_value, increment_by, cache_size, min_value, max_value, cycle_flag, sequence_schema, sequence_name FROM SYSTEM.\"SEQUENCE\""); assertTrue(rs.next()); assertEquals(2, rs.getLong("start_with")); - assertEquals(null, rs.getBytes("current_value")); + assertEquals(2, rs.getInt("current_value")); assertEquals(3, rs.getLong("increment_by")); assertEquals(5, rs.getLong("cache_size")); assertEquals(0, rs.getLong("min_value")); @@ -654,7 +662,7 @@ public class SequenceIT extends BaseClientManagedTimeIT { rs = conn.createStatement().executeQuery("SELECT sequence_name, current_value FROM SYSTEM.\"SEQUENCE\" WHERE sequence_name='BAR'"); assertTrue(rs.next()); assertEquals("BAR", rs.getString(1)); - assertEquals(null, rs.getBytes(2)); + assertEquals(1, rs.getInt(2)); conn.close(); conn2.close(); @@ -690,6 +698,7 @@ public class SequenceIT extends BaseClientManagedTimeIT { assertEquals(4, rs.getInt(1)); } + // if nextConnection() is not used to get to get a connection, make sure you call .close() so that connections are not leaked private void nextConnection() throws Exception { if (conn != null) conn.close(); long ts = nextTimestamp(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index cd4bd5e..a303b95 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -64,7 +64,7 @@ public interface MetaDataProtocol extends CoprocessorProtocol { public static final long MIN_TABLE_TIMESTAMP = 0; // Incremented with the addition of INDEX_TYPE to SYSTEM.CATALOG (though it's unused in 3.0) // plus the addition of MIN_VALUE, MAX_VALUE, and CYCLE to SYSTEM.SEQUENCE. - public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 2; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 3; public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000; // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java index cc3c0b4..944c4e6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Set; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; @@ -53,6 +54,10 @@ import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SequenceUtil; import org.apache.phoenix.util.ServerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; /** * @@ -82,6 +87,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timestamp, errorCodeBuf))); } + /** * Use PreIncrement hook of BaseRegionObserver to overcome deficiencies in Increment * implementation (HBASE-10254): @@ -125,104 +131,113 @@ public class SequenceRegionObserver extends BaseRegionObserver { return result; } + KeyValue currentValueKV = Sequence.getCurrentValueKV(result); KeyValue incrementByKV = Sequence.getIncrementByKV(result); - KeyValue currentValueKV = Sequence.getCurrentValueKV(result); - KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result); - KeyValue cycleKV = Sequence.getCycleKV(result); - KeyValue minValueKV = Sequence.getMinValueKV(result); - KeyValue maxValueKV = Sequence.getMaxValueKV(result); + KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result); + + long currentValue = PDataType.LONG.getCodec().decodeLong(currentValueKV.getBuffer(), currentValueKV.getValueOffset(), SortOrder.getDefault()); + long incrementBy = PDataType.LONG.getCodec().decodeLong(incrementByKV.getBuffer(), incrementByKV.getValueOffset(), SortOrder.getDefault()); + long cacheSize = PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getBuffer(), cacheSizeKV.getValueOffset(), SortOrder.getDefault()); // Hold timestamp constant for sequences, so that clients always only see the latest // value regardless of when they connect. - Put put = new Put(row, currentValueKV.getTimestamp()); - - // create a copy of the key values, used for the new Return - List<KeyValue> newkvs = Sequence.getCells(result); + long timestamp = currentValueKV.getTimestamp(); + Put put = new Put(row, timestamp); - long incrementBy = - PDataType.LONG.getCodec().decodeLong(incrementByKV.getBuffer(), - incrementByKV.getValueOffset(), SortOrder.getDefault()); - - long cacheSize = - PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getBuffer(), - cacheSizeKV.getValueOffset(), SortOrder.getDefault()); + int numIncrementKVs = increment.getFamilyMap().get(PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES).size(); + // creates the list of KeyValues used for the Result that will be returned + List<KeyValue> newkvs = Sequence.getKeyValueList(result, numIncrementKVs); - // if the minValue, maxValue, or cycle is null this sequence has been upgraded from - // a lower version. Set minValue, maxValue and cycle to Long.MIN_VALUE, Long.MAX_VALUE and true - // respectively in order to maintain existing behavior and also update the KeyValues on the server - long minValue; - if (minValueKV == null) { - minValue = Long.MIN_VALUE; - // create new key value for put - byte[] newMinValueBuffer = new byte[PDataType.LONG.getByteSize()]; - PDataType.LONG.getCodec().encodeLong(minValue, newMinValueBuffer, 0); - KeyValue newMinValueKV = KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, - PhoenixDatabaseMetaData.MIN_VALUE_BYTES, currentValueKV.getTimestamp(), newMinValueBuffer); - put.add(newMinValueKV); - // update key value in returned Result - Sequence.replaceMinValueKV(newkvs, newMinValueKV); - } - else { - minValue = PDataType.LONG.getCodec().decodeLong(minValueKV.getBuffer(), - minValueKV.getValueOffset(), SortOrder.getDefault()); - } - long maxValue; - if (maxValueKV == null) { - maxValue = Long.MAX_VALUE; - // create new key value for put - byte[] newMaxValueBuffer = new byte[PDataType.LONG.getByteSize()]; - PDataType.LONG.getCodec().encodeLong(maxValue, newMaxValueBuffer, 0); - KeyValue newMaxValueKV = KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, - PhoenixDatabaseMetaData.MAX_VALUE_BYTES, currentValueKV.getTimestamp(), newMaxValueBuffer); - put.add(newMaxValueKV); - // update key value in returned Result - Sequence.replaceMaxValueKV(newkvs, newMaxValueKV); - } - else { - maxValue = PDataType.LONG.getCodec().decodeLong(maxValueKV.getBuffer(), - maxValueKV.getValueOffset(), SortOrder.getDefault()); - } - boolean cycle; - if (cycleKV == null) { - cycle = false; - // create new key value for put - KeyValue newCycleKV = KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, - PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, currentValueKV.getTimestamp(), PDataType.FALSE_BYTES); - put.add(newCycleKV); - // update key value in returned Result - Sequence.replaceCycleValueKV(newkvs, newCycleKV); - } - else { - cycle = (Boolean) PDataType.BOOLEAN.toObject(cycleKV.getBuffer(), - cycleKV.getValueOffset(), cycleKV.getValueLength()); - } - long currentValue; - // initialize current value to start value - if (currentValueKV.getValueLength()==0) { - KeyValue startValueKV = Sequence.getStartValueKV(result); - currentValue = - PDataType.LONG.getCodec().decodeLong(startValueKV.getBuffer(), - startValueKV.getValueOffset(), SortOrder.getDefault()); + //if client is 3.0/4.0 preserve the old behavior (older clients won't have newer columns present in the increment) + if (numIncrementKVs != Sequence.NUM_SEQUENCE_KEY_VALUES) { + currentValue += incrementBy * cacheSize; + // Hold timestamp constant for sequences, so that clients always only see the latest value + // regardless of when they connect. + KeyValue newCurrentValueKV = createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp); + put.add(newCurrentValueKV); + Sequence.replaceCurrentValueKV(newkvs, newCurrentValueKV); } else { - currentValue = - PDataType.LONG.getCodec().decodeLong(currentValueKV.getBuffer(), - currentValueKV.getValueOffset(), SortOrder.getDefault()); - try { - // set currentValue to nextValue - currentValue = - SequenceUtil.getNextValue(currentValue, minValue, maxValue, - incrementBy, cacheSize, cycle); - } catch (SQLException sqlE) { - return getErrorResult(row, maxTimestamp, sqlE.getErrorCode()); - } + KeyValue cycleKV = Sequence.getCycleKV(result); + KeyValue limitReachedKV = Sequence.getLimitReachedKV(result); + KeyValue minValueKV = Sequence.getMinValueKV(result); + KeyValue maxValueKV = Sequence.getMaxValueKV(result); + + boolean increasingSeq = incrementBy > 0 ? true : false; + + // if the minValue, maxValue, cycle and limitReached is null this sequence has been upgraded from + // a lower version. Set minValue, maxValue, cycle and limitReached to Long.MIN_VALUE, Long.MAX_VALUE, true and false + // respectively in order to maintain existing behavior and also update the KeyValues on the server + boolean limitReached; + if (limitReachedKV == null) { + limitReached = false; + KeyValue newLimitReachedKV = createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, limitReached, timestamp); + put.add(newLimitReachedKV); + Sequence.replaceLimitReachedKV(newkvs, newLimitReachedKV); + } + else { + limitReached = (Boolean) PDataType.BOOLEAN.toObject(limitReachedKV.getBuffer(), + limitReachedKV.getValueOffset(), limitReachedKV.getValueLength()); + } + long minValue; + if (minValueKV == null) { + minValue = Long.MIN_VALUE; + KeyValue newMinValueKV = createKeyValue(row, PhoenixDatabaseMetaData.MIN_VALUE_BYTES, minValue, timestamp); + put.add(newMinValueKV); + Sequence.replaceMinValueKV(newkvs, newMinValueKV); + } + else { + minValue = PDataType.LONG.getCodec().decodeLong(minValueKV.getBuffer(), + minValueKV.getValueOffset(), SortOrder.getDefault()); + } + long maxValue; + if (maxValueKV == null) { + maxValue = Long.MAX_VALUE; + KeyValue newMaxValueKV = createKeyValue(row, PhoenixDatabaseMetaData.MAX_VALUE_BYTES, maxValue, timestamp); + put.add(newMaxValueKV); + Sequence.replaceMaxValueKV(newkvs, newMaxValueKV); + } + else { + maxValue = PDataType.LONG.getCodec().decodeLong(maxValueKV.getBuffer(), + maxValueKV.getValueOffset(), SortOrder.getDefault()); + } + boolean cycle; + if (cycleKV == null) { + cycle = false; + KeyValue newCycleKV = createKeyValue(row, PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, cycle, timestamp); + put.add(newCycleKV); + Sequence.replaceCycleValueKV(newkvs, newCycleKV); + } + else { + cycle = (Boolean) PDataType.BOOLEAN.toObject(cycleKV.getBuffer(), + cycleKV.getValueOffset(), cycleKV.getValueLength()); + } + + // return if we have run out of sequence values + if (limitReached) { + if (cycle) { + // reset currentValue of the Sequence row to minValue/maxValue + currentValue = increasingSeq ? minValue : maxValue; + } + else { + SQLExceptionCode code = increasingSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE + : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE; + return getErrorResult(row, maxTimestamp, code.getErrorCode()); + } + } + + // check if the limit was reached + limitReached = SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize); + // update currentValue + currentValue += incrementBy * cacheSize; + // update the currentValue of the Result row + KeyValue newCurrentValueKV = createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp); + Sequence.replaceCurrentValueKV(newkvs, newCurrentValueKV); + put.add(newCurrentValueKV); + // set the LIMIT_REACHED column to true, so that no new values can be used + KeyValue newLimitReachedKV = createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, limitReached, timestamp); + put.add(newLimitReachedKV); } - byte[] newCurrentValueBuffer = new byte[PDataType.LONG.getByteSize()]; - PDataType.LONG.getCodec().encodeLong(currentValue, newCurrentValueBuffer, 0); - KeyValue newCurrentValueKV = KeyValueUtil.newKeyValue(row, currentValueKV, newCurrentValueBuffer); - put.add(newCurrentValueKV); - Sequence.replaceCurrentValueKV(newkvs, newCurrentValueKV); - // update the KeyValues on the server @SuppressWarnings("unchecked") Pair<Mutation,Integer>[] mutations = new Pair[1]; @@ -240,6 +255,36 @@ public class SequenceRegionObserver extends BaseRegionObserver { region.closeRegionOperation(); } } + + /** + * Creates a new KeyValue for a long value + * + * @param key + * key used while creating KeyValue + * @param cqBytes + * column qualifier of KeyValue + * @return return the KeyValue that was created + */ + KeyValue createKeyValue(byte[] key, byte[] cqBytes, long value, long timestamp) { + byte[] valueBuffer = new byte[PDataType.LONG.getByteSize()]; + PDataType.LONG.getCodec().encodeLong(value, valueBuffer, 0); + return KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, cqBytes, timestamp, valueBuffer); + } + + /** + * Creates a new KeyValue for a boolean value and adds it to the given put + * + * @param key + * key used while creating KeyValue + * @param cqBytes + * column qualifier of KeyValue + * @return return the KeyValue that was created + */ + private KeyValue createKeyValue(byte[] key, byte[] cqBytes, boolean value, long timestamp) throws IOException { + // create new key value for put + return KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, cqBytes, + timestamp, value ? PDataType.TRUE_BYTES : PDataType.FALSE_BYTES); + } /** * Override the preAppend for checkAndPut and checkAndDelete, as we need the ability to http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java index 8312fe7..3c3f933 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java @@ -21,10 +21,6 @@ import java.sql.SQLException; import java.util.List; import org.apache.hadoop.hbase.HRegionLocation; - -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.parse.HintNode; @@ -32,6 +28,10 @@ import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.TableRef; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + /** * Split the region according to the information contained in the scan's SkipScanFilter. http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index de799ce..310ec03 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -199,7 +199,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho public static final byte[] CURRENT_VALUE_BYTES = Bytes.toBytes(CURRENT_VALUE); public static final String START_WITH = "START_WITH"; public static final byte[] START_WITH_BYTES = Bytes.toBytes(START_WITH); - // MIN_VALUE, MAX_VALUE and CYCLE_FLAG were added in 3.0 + // MIN_VALUE, MAX_VALUE, CYCLE_FLAG and LIMIT_FLAG were added in 3.1/4.1 public static final String MIN_VALUE = "MIN_VALUE"; public static final byte[] MIN_VALUE_BYTES = Bytes.toBytes(MIN_VALUE); public static final String MAX_VALUE = "MAX_VALUE"; @@ -210,6 +210,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho public static final byte[] CACHE_SIZE_BYTES = Bytes.toBytes(CACHE_SIZE); public static final String CYCLE_FLAG = "CYCLE_FLAG"; public static final byte[] CYCLE_FLAG_BYTES = Bytes.toBytes(CYCLE_FLAG); + public static final String LIMIT_REACHED_FLAG = "LIMIT_REACHED_FLAG"; + public static final byte[] LIMIT_REACHED_FLAG_BYTES = Bytes.toBytes(LIMIT_REACHED_FLAG); public static final String KEY_SEQ = "KEY_SEQ"; public static final byte[] KEY_SEQ_BYTES = Bytes.toBytes(KEY_SEQ); public static final String SUPERTABLE_NAME = "SUPERTABLE_NAME"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index b13ced2..e6c99fe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -19,6 +19,7 @@ package org.apache.phoenix.query; import static com.google.common.io.Closeables.closeQuietly; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; @@ -1301,7 +1302,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement String newColumns = MIN_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", " + MAX_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", " - + CYCLE_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName(); + + CYCLE_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName() + ", " + + LIMIT_REACHED_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName(); metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, newColumns); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 1c1bba7..ee4c577 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -49,6 +50,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.schema.NewerTableAlreadyExistsException; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PMetaDataImpl; @@ -63,6 +65,7 @@ import org.apache.phoenix.schema.SequenceAlreadyExistsException; import org.apache.phoenix.schema.SequenceInfo; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.SequenceNotFoundException; +import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; @@ -356,18 +359,29 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException { int i = 0; - for (SequenceKey key : sequenceKeys) { - SequenceInfo info = sequenceMap.get(key); - if (info == null) { - exceptions[i] = - new SequenceNotFoundException(key.getSchemaName(), key.getSequenceName()); - } else { - values[i] = info.sequenceValue; - info.sequenceValue = - SequenceUtil.getNextValue(key, info); - } - i++; - } + for (SequenceKey key : sequenceKeys) { + SequenceInfo info = sequenceMap.get(key); + if (info == null) { + exceptions[i] = new SequenceNotFoundException( + key.getSchemaName(), key.getSequenceName()); + } else { + boolean increaseSeq = info.incrementBy > 0; + if (info.limitReached) { + SQLExceptionCode code = increaseSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE + : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE; + exceptions[i] = new SQLExceptionInfo.Builder(code).build().buildException(); + } else { + values[i] = info.sequenceValue; + info.sequenceValue += info.incrementBy * info.cacheSize; + info.limitReached = SequenceUtil.checkIfLimitReached(info); + if (info.limitReached && info.cycle) { + info.sequenceValue = increaseSeq ? info.minValue : info.maxValue; + info.limitReached = false; + } + } + } + i++; + } i = 0; for (SQLException e : exceptions) { if (e != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 20ab137..107b03f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -41,6 +41,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE; @@ -227,7 +228,8 @@ public interface QueryConstants { // the following three columns were added in 3.0/4.0 MIN_VALUE + " BIGINT, \n" + MAX_VALUE + " BIGINT, \n" + - CYCLE_FLAG + " BOOLEAN \n" + + CYCLE_FLAG + " BOOLEAN, \n" + + LIMIT_REACHED_FLAG + " BOOLEAN \n" + " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" + HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + "\n"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java index eff39ab..e992126 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java @@ -22,10 +22,10 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH_BYTES; import java.io.IOException; import java.sql.SQLException; @@ -65,19 +65,19 @@ public class Sequence { private static final KeyValue CURRENT_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, CURRENT_VALUE_BYTES); private static final KeyValue INCREMENT_BY_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, INCREMENT_BY_BYTES); private static final KeyValue CACHE_SIZE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, CACHE_SIZE_BYTES); - private static final KeyValue START_WITH_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, START_WITH_BYTES); private static final KeyValue MIN_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, MIN_VALUE_BYTES); private static final KeyValue MAX_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, MAX_VALUE_BYTES); private static final KeyValue CYCLE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, CYCLE_FLAG_BYTES); + private static final KeyValue LIMIT_REACHED_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, LIMIT_REACHED_FLAG_BYTES); private static final List<KeyValue> SEQUENCE_KV_COLUMNS = Arrays.<KeyValue>asList( CURRENT_VALUE_KV, INCREMENT_BY_KV, CACHE_SIZE_KV, - START_WITH_KV, - // the following three columns were added in 3.0/4.0 + // The following three columns were added in 3.1/4.1 MIN_VALUE_KV, MAX_VALUE_KV, - CYCLE_KV + CYCLE_KV, + LIMIT_REACHED_KV ); static { Collections.sort(SEQUENCE_KV_COLUMNS, KeyValue.COMPARATOR); @@ -86,12 +86,12 @@ public class Sequence { private static final int CURRENT_VALUE_INDEX = SEQUENCE_KV_COLUMNS.indexOf(CURRENT_VALUE_KV); private static final int INCREMENT_BY_INDEX = SEQUENCE_KV_COLUMNS.indexOf(INCREMENT_BY_KV); private static final int CACHE_SIZE_INDEX = SEQUENCE_KV_COLUMNS.indexOf(CACHE_SIZE_KV); - private static final int START_WITH_INDEX = SEQUENCE_KV_COLUMNS.indexOf(START_WITH_KV); private static final int MIN_VALUE_INDEX = SEQUENCE_KV_COLUMNS.indexOf(MIN_VALUE_KV); private static final int MAX_VALUE_INDEX = SEQUENCE_KV_COLUMNS.indexOf(MAX_VALUE_KV); private static final int CYCLE_INDEX = SEQUENCE_KV_COLUMNS.indexOf(CYCLE_KV); + private static final int LIMIT_REACHED_INDEX = SEQUENCE_KV_COLUMNS.indexOf(LIMIT_REACHED_KV); - private static final int NUM_SEQUENCE_KEY_VALUES = SEQUENCE_KV_COLUMNS.size(); + public static final int NUM_SEQUENCE_KEY_VALUES = SEQUENCE_KV_COLUMNS.size(); private static final EmptySequenceCacheException EMPTY_SEQUENCE_CACHE_EXCEPTION = new EmptySequenceCacheException(); private final SequenceKey key; @@ -158,7 +158,6 @@ public class Sequence { long returnValue = value.currentValue; if (factor != 0) { - --value.unusedValues; boolean overflowOrUnderflow=false; // advance currentValue while checking for overflow try { @@ -183,7 +182,7 @@ public class Sequence { if (value == null) { throw EMPTY_SEQUENCE_CACHE_EXCEPTION; } - if (value.unusedValues == 0) { + if (value.currentValue == value.nextValue) { if (action == ValueOp.VALIDATE_SEQUENCE) { return value.currentValue; } @@ -198,7 +197,7 @@ public class Sequence { } List<Append> appends = Lists.newArrayListWithExpectedSize(values.size()); for (SequenceValue value : values) { - if (value.isInitialized() && value.unusedValues>0) { + if (value.isInitialized() && value.currentValue != value.nextValue) { appends.add(newReturn(value)); } } @@ -210,7 +209,7 @@ public class Sequence { if (value == null) { throw EMPTY_SEQUENCE_CACHE_EXCEPTION; } - if (value.unusedValues==0) { + if (value.currentValue == value.nextValue) { throw EMPTY_SEQUENCE_CACHE_EXCEPTION; } return newReturn(value); @@ -221,11 +220,12 @@ public class Sequence { Append append = new Append(key); byte[] opBuf = new byte[] {(byte)MetaOp.RETURN_SEQUENCE.ordinal()}; append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, opBuf); - append.setAttribute(SequenceRegionObserver.CURRENT_VALUE_ATTRIB, PDataType.LONG.toBytes(value.startValue)); + append.setAttribute(SequenceRegionObserver.CURRENT_VALUE_ATTRIB, PDataType.LONG.toBytes(value.nextValue)); Map<byte[], List<KeyValue>> familyMap = append.getFamilyMap(); familyMap.put(PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, Arrays.<KeyValue>asList( - KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, value.timestamp, ByteUtil.EMPTY_BYTE_ARRAY), - KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.START_WITH_BYTES, value.timestamp, PDataType.LONG.toBytes(value.currentValue)) + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, value.timestamp, PDataType.LONG.toBytes(value.currentValue)), + // set LIMIT_REACHED flag to false since we are returning unused sequence values + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, value.timestamp, PDataType.FALSE_BYTES) )); return append; } @@ -299,9 +299,9 @@ public class Sequence { private static KeyValue getKeyValue(Result r, KeyValue kv, int cellIndex) { KeyValue[] kvs = r.raw(); // if the sequence row is from a previous version then MIN_VALUE, MAX_VALUE and CYCLE key values are not present, - // the sequence row has only four columns (START_VALUE, INCREMENT_BY, CACHE_SIZE and CURRENT_VALUE) and the order of the cells + // the sequence row has only three columns (INCREMENT_BY, CACHE_SIZE and CURRENT_VALUE) and the order of the cells // in the array returned by rawCells() is not what what we expect so use getColumnLatestCell() to get the cell we want - return kvs.length != NUM_SEQUENCE_KEY_VALUES ? r.getColumnLatest(kv.getFamily(), kv.getQualifier()) : (kvs[cellIndex]); + return kvs.length == NUM_SEQUENCE_KEY_VALUES ? kvs[cellIndex] : r.getColumnLatest(kv.getFamily(), kv.getQualifier()); } private static KeyValue getKeyValue(Result r, KeyValue kv) { @@ -320,10 +320,6 @@ public class Sequence { return getKeyValue(r, CACHE_SIZE_KV, CACHE_SIZE_INDEX); } - public static KeyValue getStartValueKV(Result r) { - return getKeyValue(r, START_WITH_KV, START_WITH_INDEX); - } - public static KeyValue getMinValueKV(Result r) { return getKeyValue(r, MIN_VALUE_KV, MIN_VALUE_INDEX); } @@ -336,6 +332,10 @@ public class Sequence { return getKeyValue(r, CYCLE_KV, CYCLE_INDEX); } + public static KeyValue getLimitReachedKV(Result r) { + return getKeyValue(r, LIMIT_REACHED_KV, LIMIT_REACHED_INDEX); + } + public static void replaceCurrentValueKV(List<KeyValue> kvs, KeyValue currentValueKV) { kvs.set(CURRENT_VALUE_INDEX, currentValueKV); } @@ -351,16 +351,31 @@ public class Sequence { public static void replaceCycleValueKV(List<KeyValue> kvs, KeyValue cycleValueKV) { kvs.set(CYCLE_INDEX, cycleValueKV); } + public static void replaceLimitReachedKV(List<KeyValue> kvs, KeyValue limitReachedKV) { + kvs.set(LIMIT_REACHED_INDEX, limitReachedKV); + } /** - * Returns a KeyValue[] for the result row. Handles empty MIN_VALUE, MAX_VALUE and CYCLE - * KeyValues if the sequence row is from a previous version + * Returns the KeyValues of r if it contains the expected number of KeyValues, + * else returns a list of KeyValues corresponding to SEQUENCE_KV_COLUMNS */ - public static List<KeyValue> getCells(Result r) { + public static List<KeyValue> getKeyValueList(Result r, int numKVs) { // if the sequence row is from a previous version - if (r.raw().length == NUM_SEQUENCE_KEY_VALUES ) + if (r.raw().length == numKVs ) return Lists.newArrayList(r.raw()); - // else we need to handle missing MIN_VALUE, MAX_VALUE and CYCLE KeyValues + // else we need to handle missing MIN_VALUE, MAX_VALUE, CYCLE and LIMIT_REACHED KeyValues + List<KeyValue> kvList = Lists.newArrayListWithCapacity(NUM_SEQUENCE_KEY_VALUES); + for (KeyValue kv : SEQUENCE_KV_COLUMNS) { + kvList.add(getKeyValue(r,kv)); + } + return kvList; + } + + /** + * Returns a list of KeyValues for the result row to be returned, adding only those KeyValues in r + * that are present in SEQUENCE_KV_COLUMNS + */ + public static List<KeyValue> getCells(Result r) { List<KeyValue> kvList = Lists.newArrayListWithCapacity(NUM_SEQUENCE_KEY_VALUES); for (KeyValue kv : SEQUENCE_KV_COLUMNS) { kvList.add(getKeyValue(r,kv)); @@ -371,15 +386,13 @@ public class Sequence { private static final class SequenceValue { public final long incrementBy; public final long timestamp; + public final long cacheSize; public long currentValue; - // start value of the current batch - public long startValue; + public long nextValue; public long minValue; public long maxValue; public boolean cycle; - // number of values left in current batch - public long unusedValues; public boolean isDeleted; public boolean limitReached; @@ -395,6 +408,7 @@ public class Sequence { this.isDeleted = isDeleted; this.incrementBy = 0; this.limitReached = false; + this.cacheSize = 0; } public boolean isInitialized() { @@ -413,16 +427,14 @@ public class Sequence { KeyValue maxValueKV = getMaxValueKV(r); KeyValue cycleKV = getCycleKV(r); this.timestamp = currentValueKV.getTimestamp(); - this.currentValue = PDataType.LONG.getCodec().decodeLong(currentValueKV.getBuffer(), currentValueKV.getValueOffset(), SortOrder.getDefault()); + this.nextValue = PDataType.LONG.getCodec().decodeLong(currentValueKV.getBuffer(), currentValueKV.getValueOffset(), SortOrder.getDefault()); this.incrementBy = PDataType.LONG.getCodec().decodeLong(incrementByKV.getBuffer(), incrementByKV.getValueOffset(), SortOrder.getDefault()); - this.unusedValues = PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getBuffer(), cacheSizeKV.getValueOffset(), SortOrder.getDefault()); + this.cacheSize = PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getBuffer(), cacheSizeKV.getValueOffset(), SortOrder.getDefault()); this.minValue = PDataType.LONG.getCodec().decodeLong(minValueKV.getBuffer(), minValueKV.getValueOffset(), SortOrder.getDefault()); this.maxValue = PDataType.LONG.getCodec().decodeLong(maxValueKV.getBuffer(), maxValueKV.getValueOffset(), SortOrder.getDefault()); this.cycle = (Boolean)PDataType.BOOLEAN.toObject(cycleKV.getBuffer(), cycleKV.getValueOffset(), cycleKV.getValueLength()); this.limitReached = false; - // store the start value of this batch of sequence values, so that it can be used to - // determine if we can return unused sequence values when we close the connection - this.startValue = this.currentValue; + currentValue = nextValue - incrementBy * cacheSize; } } @@ -463,13 +475,14 @@ public class Sequence { byte[] startWithBuf = PDataType.LONG.toBytes(startWith); familyMap.put(PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, Arrays.<KeyValue>asList( KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timestamp, ByteUtil.EMPTY_BYTE_ARRAY), - KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, timestamp, ByteUtil.EMPTY_BYTE_ARRAY), + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, timestamp, startWithBuf), KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.START_WITH_BYTES, timestamp, startWithBuf), KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.INCREMENT_BY_BYTES, timestamp, PDataType.LONG.toBytes(incrementBy)), KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CACHE_SIZE_BYTES, timestamp, PDataType.LONG.toBytes(cacheSize)), KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_VALUE_BYTES, timestamp, PDataType.LONG.toBytes(minValue)), KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_VALUE_BYTES, timestamp, PDataType.LONG.toBytes(maxValue)), - KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, timestamp, PDataType.BOOLEAN.toBytes(cycle)) + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, timestamp, PDataType.BOOLEAN.toBytes(cycle)), + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, timestamp, PDataType.FALSE_BYTES) )); return append; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java index 26ea132..be4455b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java @@ -17,6 +17,7 @@ public class SequenceInfo { public final long maxValue; public final long cacheSize; public final boolean cycle; + public boolean limitReached; public SequenceInfo(long sequenceValue, long incrementBy, long minValue, long maxValue, long cacheSize, boolean cycle) { this.sequenceValue = sequenceValue; @@ -25,5 +26,6 @@ public class SequenceInfo { this.maxValue = maxValue; this.cacheSize = cacheSize; this.cycle = cycle; + this.limitReached = false; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java index 2f10c15..5102faa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java @@ -76,14 +76,6 @@ public class KeyValueUtil { value, valueOffset, valueLength); } - public static KeyValue newKeyValue(byte[] key, KeyValue kv, byte[] value) { - return newKeyValue(key, 0, key.length, - kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(), - kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(), - kv.getTimestamp(), - value, 0, value.length); - } - public static KeyValue newKeyValue(byte[] key, byte[] cf, byte[] cq, long ts, byte[] value) { return newKeyValue(key,cf,cq,ts,value,0,value.length); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java index f17d721..f97d565 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java @@ -15,7 +15,6 @@ import java.sql.SQLException; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.schema.SequenceInfo; -import org.apache.phoenix.schema.SequenceKey; import com.google.common.math.LongMath; @@ -28,36 +27,28 @@ public class SequenceUtil { * Returns the nextValue of a sequence * @throws SQLException if cycle is false and the sequence limit has been reached */ - public static long getNextValue(long currentValue, long minValue, long maxValue, - long incrementBy, long cacheSize, boolean cycle) throws SQLException { + public static boolean checkIfLimitReached(long currentValue, long minValue, long maxValue, + long incrementBy, long cacheSize) throws SQLException { long nextValue = 0; boolean increasingSeq = incrementBy > 0 ? true : false; - boolean overflowOrUnderflow = false; // advance currentValue while checking for overflow try { long incrementValue = LongMath.checkedMultiply(incrementBy, cacheSize); nextValue = LongMath.checkedAdd(currentValue, incrementValue); } catch (ArithmeticException e) { - overflowOrUnderflow = true; + return true; } - // check if overflow or limit was reached - if (overflowOrUnderflow || (increasingSeq && nextValue > maxValue) - || (!increasingSeq && nextValue < minValue)) { - if (cycle) { - nextValue = increasingSeq ? minValue : maxValue; - } else { - SQLExceptionCode code = - increasingSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE - : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE; - throw new SQLExceptionInfo.Builder(code).build().buildException(); - } + // check if limit was reached + if ((increasingSeq && nextValue > maxValue) + || (!increasingSeq && nextValue < minValue)) { + return true; } - return nextValue; + return false; } - public static long getNextValue(SequenceKey key, SequenceInfo info) throws SQLException { - return getNextValue(info.sequenceValue, info.minValue, info.maxValue, info.incrementBy, info.cacheSize, info.cycle); + public static boolean checkIfLimitReached(SequenceInfo info) throws SQLException { + return checkIfLimitReached(info.sequenceValue, info.minValue, info.maxValue, info.incrementBy, info.cacheSize); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java index bb1ef49..f25a213 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java @@ -10,11 +10,11 @@ */ package org.apache.phoenix.util; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.sql.SQLException; -import org.apache.phoenix.exception.SQLExceptionCode; import org.junit.Test; public class SequenceUtilTest { @@ -25,93 +25,41 @@ public class SequenceUtilTest { @Test public void testAscendingNextValueWithinLimit() throws SQLException { - assertEquals(9, SequenceUtil.getNextValue(5, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, - CACHE_SIZE, false)); + assertFalse(SequenceUtil.checkIfLimitReached(5, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE)); } @Test public void testAscendingNextValueReachLimit() throws SQLException { - assertEquals(MAX_VALUE, SequenceUtil.getNextValue(6, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, - CACHE_SIZE, false)); + assertFalse(SequenceUtil.checkIfLimitReached(6, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE)); } @Test - public void testAscendingNextValueGreaterThanMaxValueNoCycle() throws SQLException { - try { - SequenceUtil.getNextValue(MAX_VALUE, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE, - false); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(), - e.getErrorCode()); - } - } - - @Test - public void testAscendingNextValueGreaterThanMaxValueCycle() throws SQLException { - assertEquals(MIN_VALUE, SequenceUtil.getNextValue(MAX_VALUE, MIN_VALUE, MAX_VALUE, - 2/* incrementBy */, CACHE_SIZE, true)); - } - - @Test - public void testAscendingOverflowNoCycle() throws SQLException { - try { - SequenceUtil.getNextValue(Long.MAX_VALUE, 0, Long.MAX_VALUE, 1/* incrementBy */, CACHE_SIZE, - false); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(), - e.getErrorCode()); - } + public void testAscendingNextValueGreaterThanMaxValue() throws SQLException { + assertTrue(SequenceUtil.checkIfLimitReached(MAX_VALUE, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE)); } @Test - public void testAscendingOverflowCycle() throws SQLException { - assertEquals(0, SequenceUtil.getNextValue(Long.MAX_VALUE, 0, Long.MAX_VALUE, - 1/* incrementBy */, CACHE_SIZE, true)); + public void testAscendingOverflow() throws SQLException { + assertTrue(SequenceUtil.checkIfLimitReached(Long.MAX_VALUE, 0, Long.MAX_VALUE, 1/* incrementBy */, CACHE_SIZE)); } @Test public void testDescendingNextValueWithinLimit() throws SQLException { - assertEquals(2, SequenceUtil.getNextValue(6, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, - CACHE_SIZE, false)); + assertFalse(SequenceUtil.checkIfLimitReached(6, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE)); } @Test public void testDescendingNextValueReachLimit() throws SQLException { - assertEquals(MIN_VALUE, SequenceUtil.getNextValue(5, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, - CACHE_SIZE, false)); + assertFalse(SequenceUtil.checkIfLimitReached(5, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE)); } @Test - public void testDescendingNextValueLessThanMinValueNoCycle() throws SQLException { - try { - SequenceUtil.getNextValue(1, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE, - false); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE.getErrorCode(), - e.getErrorCode()); - } - } - - @Test - public void testDescendingNextValueLessThanMinValueCycle() throws SQLException { - assertEquals(MAX_VALUE, SequenceUtil.getNextValue(2, MIN_VALUE, MAX_VALUE, - -2/* incrementBy */, CACHE_SIZE, true)); - } - - @Test - public void testDescendingOverflowNoCycle() throws SQLException { - try { - SequenceUtil.getNextValue(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/* incrementBy */, CACHE_SIZE, - false); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE.getErrorCode(), - e.getErrorCode()); - } + public void testDescendingNextValueLessThanMinValue() throws SQLException { + assertTrue(SequenceUtil.checkIfLimitReached(2, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE)); } @Test public void testDescendingOverflowCycle() throws SQLException { - assertEquals(0, SequenceUtil.getNextValue(Long.MIN_VALUE, Long.MIN_VALUE, 0, - -1/* incrementBy */, CACHE_SIZE, true)); + assertTrue(SequenceUtil.checkIfLimitReached(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/* incrementBy */, CACHE_SIZE)); } }