Repository: phoenix Updated Branches: refs/heads/4.0 d372c6591 -> aba5ea906
PHOENIX-1016 Support MINVALUE, MAXVALUE, and CYCLE options in CREATE SEQUENCE (Thomas D'Silva) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/aba5ea90 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/aba5ea90 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/aba5ea90 Branch: refs/heads/4.0 Commit: aba5ea906424238664ef61dbb04e39de5896cc43 Parents: d372c65 Author: James Taylor <[email protected]> Authored: Mon Jul 28 20:28:52 2014 -0700 Committer: James Taylor <[email protected]> Committed: Mon Jul 28 20:28:52 2014 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/SequenceIT.java | 19 +- .../phoenix/coprocessor/MetaDataProtocol.java | 2 +- .../coprocessor/SequenceRegionObserver.java | 226 +++++++++++-------- .../org/apache/phoenix/hbase/index/Indexer.java | 10 - .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 4 +- .../query/ConnectionQueryServicesImpl.java | 6 +- .../query/ConnectionlessQueryServicesImpl.java | 35 ++- .../apache/phoenix/query/QueryConstants.java | 4 +- .../org/apache/phoenix/schema/Sequence.java | 94 ++++---- .../org/apache/phoenix/schema/SequenceInfo.java | 2 + .../trace/PhoenixTableMetricsWriter.java | 4 +- .../org/apache/phoenix/util/KeyValueUtil.java | 14 +- .../org/apache/phoenix/util/SequenceUtil.java | 29 +-- .../apache/phoenix/util/SequenceUtilTest.java | 78 ++----- 14 files changed, 259 insertions(+), 268 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java index 0208d17..3e0301c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java @@ -43,6 +43,7 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SequenceUtil; +import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -55,7 +56,7 @@ import com.google.common.collect.Maps; public class SequenceIT extends BaseClientManagedTimeIT { private static final String NEXT_VAL_SQL = "SELECT NEXT VALUE FOR foo.bar FROM SYSTEM.\"SEQUENCE\""; private static final long BATCH_SIZE = 3; - + private Connection conn; @BeforeClass @@ -69,6 +70,13 @@ public class SequenceIT extends BaseClientManagedTimeIT { setUpTestDriver(getUrl(), new ReadOnlyProps(props.entrySet().iterator())); } + @After + public void tearDown() throws Exception { + // close any open connection between tests, so that connections are not leaked + if (conn != null) { + conn.close(); + } + } @Test public void testSystemTable() throws Exception { @@ -114,7 +122,7 @@ public class SequenceIT extends BaseClientManagedTimeIT { assertTrue(rs.next()); assertEquals("ALPHA", rs.getString("sequence_schema")); assertEquals("OMEGA", rs.getString("sequence_name")); - assertEquals(null, rs.getBytes("current_value")); + assertEquals(2, rs.getInt("current_value")); assertEquals(4, rs.getInt("increment_by")); assertFalse(rs.next()); } @@ -152,7 +160,7 @@ public class SequenceIT extends BaseClientManagedTimeIT { assertTrue(rs.next()); assertEquals("ALPHA", rs.getString("sequence_schema")); assertEquals("OMEGA", rs.getString("sequence_name")); - assertEquals(null, rs.getBytes("current_value")); + assertEquals(2, rs.getInt("current_value")); assertEquals(4, rs.getInt("increment_by")); assertFalse(rs.next()); @@ -208,7 +216,7 @@ public class SequenceIT extends BaseClientManagedTimeIT { "SELECT start_with, current_value, increment_by, cache_size, min_value, max_value, cycle_flag, sequence_schema, sequence_name FROM SYSTEM.\"SEQUENCE\""); assertTrue(rs.next()); assertEquals(2, rs.getLong("start_with")); - assertEquals(null, rs.getBytes("current_value")); + assertEquals(2, rs.getInt("current_value")); assertEquals(3, rs.getLong("increment_by")); assertEquals(5, rs.getLong("cache_size")); assertEquals(0, rs.getLong("min_value")); @@ -654,7 +662,7 @@ public class SequenceIT extends BaseClientManagedTimeIT { rs = conn.createStatement().executeQuery("SELECT sequence_name, current_value FROM SYSTEM.\"SEQUENCE\" WHERE sequence_name='BAR'"); assertTrue(rs.next()); assertEquals("BAR", rs.getString(1)); - assertEquals(null, rs.getBytes(2)); + assertEquals(1, rs.getInt(2)); conn.close(); conn2.close(); @@ -690,6 +698,7 @@ public class SequenceIT extends BaseClientManagedTimeIT { assertEquals(4, rs.getInt(1)); } + // if nextConnection() is not used to get to get a connection, make sure you call .close() so that connections are not leaked private void nextConnection() throws Exception { if (conn != null) conn.close(); long ts = nextTimestamp(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index 0517895..8e61f1b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -60,7 +60,7 @@ public abstract class MetaDataProtocol extends MetaDataService { public static final long MIN_TABLE_TIMESTAMP = 0; // Each time a column is added to the SYSTEM.CATALOG, this should be increased. // Adding INDEX_TYPE column for local indexing - public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 2; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 3; public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000; // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java index f537bd2..9754d00 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java @@ -23,6 +23,8 @@ import java.sql.SQLException; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -54,6 +56,8 @@ import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SequenceUtil; import org.apache.phoenix.util.ServerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; @@ -127,8 +131,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) { byte[] cf = entry.getKey(); for (Cell cq : entry.getValue()) { - long value = PDataType.LONG.getCodec().decodeLong(cq.getValueArray(), cq.getValueOffset(), - SortOrder.getDefault()); + long value = Bytes.toLong(cq.getValueArray(), cq.getValueOffset()); get.addColumn(cf, CellUtil.cloneQualifier(cq)); validateOnly &= (Sequence.ValueOp.VALIDATE_SEQUENCE.ordinal() == value); } @@ -141,109 +144,118 @@ public class SequenceRegionObserver extends BaseRegionObserver { return result; } + KeyValue currentValueKV = Sequence.getCurrentValueKV(result); KeyValue incrementByKV = Sequence.getIncrementByKV(result); - KeyValue currentValueKV = Sequence.getCurrentValueKV(result); - KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result); - KeyValue cycleKV = Sequence.getCycleKV(result); - KeyValue minValueKV = Sequence.getMinValueKV(result); - KeyValue maxValueKV = Sequence.getMaxValueKV(result); + KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result); + + long currentValue = PDataType.LONG.getCodec().decodeLong(currentValueKV.getValueArray(), currentValueKV.getValueOffset(), SortOrder.getDefault()); + long incrementBy = PDataType.LONG.getCodec().decodeLong(incrementByKV.getValueArray(), incrementByKV.getValueOffset(), SortOrder.getDefault()); + long cacheSize = PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getValueArray(), cacheSizeKV.getValueOffset(), SortOrder.getDefault()); // Hold timestamp constant for sequences, so that clients always only see the latest // value regardless of when they connect. - Put put = new Put(row, currentValueKV.getTimestamp()); - - // create a copy of the key values, used for the new Return - List<Cell> newkvs = Sequence.getCells(result); + long timestamp = currentValueKV.getTimestamp(); + Put put = new Put(row, timestamp); - long incrementBy = - PDataType.LONG.getCodec().decodeLong(incrementByKV.getValueArray(), - incrementByKV.getValueOffset(), SortOrder.getDefault()); - - long cacheSize = - PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getValueArray(), - cacheSizeKV.getValueOffset(), SortOrder.getDefault()); + int numIncrementKVs = increment.getFamilyCellMap().get(PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES).size(); + // creates the list of KeyValues used for the Result that will be returned + List<Cell> cells = Sequence.getCells(result, numIncrementKVs); - // if the minValue, maxValue, or cycle is null this sequence has been upgraded from - // a lower version. Set minValue, maxValue and cycle to Long.MIN_VALUE, Long.MAX_VALUE and true - // respectively in order to maintain existing behavior and also update the KeyValues on the server - long minValue; - if (minValueKV == null) { - minValue = Long.MIN_VALUE; - // create new key value for put - byte[] newMinValueBuffer = new byte[PDataType.LONG.getByteSize()]; - PDataType.LONG.getCodec().encodeLong(minValue, newMinValueBuffer, 0); - KeyValue newMinValueKV = KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, - PhoenixDatabaseMetaData.MIN_VALUE_BYTES, currentValueKV.getTimestamp(), newMinValueBuffer); - put.add(newMinValueKV); - // update key value in returned Result - Sequence.replaceMinValueKV(newkvs, newMinValueKV); - } - else { - minValue = PDataType.LONG.getCodec().decodeLong(minValueKV.getValueArray(), - minValueKV.getValueOffset(), SortOrder.getDefault()); - } - long maxValue; - if (maxValueKV == null) { - maxValue = Long.MAX_VALUE; - // create new key value for put - byte[] newMaxValueBuffer = new byte[PDataType.LONG.getByteSize()]; - PDataType.LONG.getCodec().encodeLong(maxValue, newMaxValueBuffer, 0); - KeyValue newMaxValueKV = KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, - PhoenixDatabaseMetaData.MAX_VALUE_BYTES, currentValueKV.getTimestamp(), newMaxValueBuffer); - put.add(newMaxValueKV); - // update key value in returned Result - Sequence.replaceMaxValueKV(newkvs, newMaxValueKV); - } - else { - maxValue = PDataType.LONG.getCodec().decodeLong(maxValueKV.getValueArray(), - maxValueKV.getValueOffset(), SortOrder.getDefault()); - } - boolean cycle; - if (cycleKV == null) { - cycle = false; - // create new key value for put - KeyValue newCycleKV = KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, - PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, currentValueKV.getTimestamp(), PDataType.FALSE_BYTES); - put.add(newCycleKV); - // update key value in returned Result - Sequence.replaceCycleValueKV(newkvs, newCycleKV); + //if client is 3.0/4.0 preserve the old behavior (older clients won't have newer columns present in the increment) + if (numIncrementKVs != Sequence.NUM_SEQUENCE_KEY_VALUES) { + currentValue += incrementBy * cacheSize; + // Hold timestamp constant for sequences, so that clients always only see the latest value + // regardless of when they connect. + KeyValue newCurrentValueKV = createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp); + put.add(newCurrentValueKV); + Sequence.replaceCurrentValueKV(cells, newCurrentValueKV); } else { - cycle = (Boolean) PDataType.BOOLEAN.toObject(cycleKV.getValueArray(), - cycleKV.getValueOffset(), cycleKV.getValueLength()); + KeyValue cycleKV = Sequence.getCycleKV(result); + KeyValue limitReachedKV = Sequence.getLimitReachedKV(result); + KeyValue minValueKV = Sequence.getMinValueKV(result); + KeyValue maxValueKV = Sequence.getMaxValueKV(result); + + boolean increasingSeq = incrementBy > 0 ? true : false; + + // if the minValue, maxValue, cycle and limitReached is null this sequence has been upgraded from + // a lower version. Set minValue, maxValue, cycle and limitReached to Long.MIN_VALUE, Long.MAX_VALUE, true and false + // respectively in order to maintain existing behavior and also update the KeyValues on the server + boolean limitReached; + if (limitReachedKV == null) { + limitReached = false; + KeyValue newLimitReachedKV = createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, limitReached, timestamp); + put.add(newLimitReachedKV); + Sequence.replaceLimitReachedKV(cells, newLimitReachedKV); + } + else { + limitReached = (Boolean) PDataType.BOOLEAN.toObject(limitReachedKV.getValueArray(), + limitReachedKV.getValueOffset(), limitReachedKV.getValueLength()); + } + long minValue; + if (minValueKV == null) { + minValue = Long.MIN_VALUE; + KeyValue newMinValueKV = createKeyValue(row, PhoenixDatabaseMetaData.MIN_VALUE_BYTES, minValue, timestamp); + put.add(newMinValueKV); + Sequence.replaceMinValueKV(cells, newMinValueKV); + } + else { + minValue = PDataType.LONG.getCodec().decodeLong(minValueKV.getValueArray(), + minValueKV.getValueOffset(), SortOrder.getDefault()); + } + long maxValue; + if (maxValueKV == null) { + maxValue = Long.MAX_VALUE; + KeyValue newMaxValueKV = createKeyValue(row, PhoenixDatabaseMetaData.MAX_VALUE_BYTES, maxValue, timestamp); + put.add(newMaxValueKV); + Sequence.replaceMaxValueKV(cells, newMaxValueKV); + } + else { + maxValue = PDataType.LONG.getCodec().decodeLong(maxValueKV.getValueArray(), + maxValueKV.getValueOffset(), SortOrder.getDefault()); + } + boolean cycle; + if (cycleKV == null) { + cycle = false; + KeyValue newCycleKV = createKeyValue(row, PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, cycle, timestamp); + put.add(newCycleKV); + Sequence.replaceCycleValueKV(cells, newCycleKV); + } + else { + cycle = (Boolean) PDataType.BOOLEAN.toObject(cycleKV.getValueArray(), + cycleKV.getValueOffset(), cycleKV.getValueLength()); + } + + // return if we have run out of sequence values + if (limitReached) { + if (cycle) { + // reset currentValue of the Sequence row to minValue/maxValue + currentValue = increasingSeq ? minValue : maxValue; + } + else { + SQLExceptionCode code = increasingSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE + : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE; + return getErrorResult(row, maxTimestamp, code.getErrorCode()); + } + } + + // check if the limit was reached + limitReached = SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize); + // update currentValue + currentValue += incrementBy * cacheSize; + // update the currentValue of the Result row + KeyValue newCurrentValueKV = createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp); + Sequence.replaceCurrentValueKV(cells, newCurrentValueKV); + put.add(newCurrentValueKV); + // set the LIMIT_REACHED column to true, so that no new values can be used + KeyValue newLimitReachedKV = createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, limitReached, timestamp); + put.add(newLimitReachedKV); } - long currentValue; - // initialize current value to start value - if (currentValueKV.getValueLength()==0) { - KeyValue startValueKV = Sequence.getStartValueKV(result); - currentValue = - PDataType.LONG.getCodec().decodeLong(startValueKV.getValueArray(), - startValueKV.getValueOffset(), SortOrder.getDefault()); - } - else { - currentValue = - PDataType.LONG.getCodec().decodeLong(currentValueKV.getValueArray(), - currentValueKV.getValueOffset(), SortOrder.getDefault()); - try { - // set currentValue to nextValue - currentValue = - SequenceUtil.getNextValue(currentValue, minValue, maxValue, - incrementBy, cacheSize, cycle); - } catch (SQLException sqlE) { - return getErrorResult(row, maxTimestamp, sqlE.getErrorCode()); - } - } - byte[] newCurrentValueBuffer = new byte[PDataType.LONG.getByteSize()]; - PDataType.LONG.getCodec().encodeLong(currentValue, newCurrentValueBuffer, 0); - KeyValue newCurrentValueKV = KeyValueUtil.newKeyValue(row, currentValueKV, newCurrentValueBuffer); - put.add(newCurrentValueKV); - Sequence.replaceCurrentValueKV(newkvs, newCurrentValueKV); - // update the KeyValues on the server Mutation[] mutations = new Mutation[]{put}; region.batchMutate(mutations); // return a Result with the updated KeyValues - return Result.create(newkvs); + return Result.create(cells); } finally { region.releaseRowLocks(locks); } @@ -254,6 +266,36 @@ public class SequenceRegionObserver extends BaseRegionObserver { region.closeRegionOperation(); } } + + /** + * Creates a new KeyValue for a long value + * + * @param key + * key used while creating KeyValue + * @param cqBytes + * column qualifier of KeyValue + * @return return the KeyValue that was created + */ + KeyValue createKeyValue(byte[] key, byte[] cqBytes, long value, long timestamp) { + byte[] valueBuffer = new byte[PDataType.LONG.getByteSize()]; + PDataType.LONG.getCodec().encodeLong(value, valueBuffer, 0); + return KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, cqBytes, timestamp, valueBuffer); + } + + /** + * Creates a new KeyValue for a boolean value and adds it to the given put + * + * @param key + * key used while creating KeyValue + * @param cqBytes + * column qualifier of KeyValue + * @return return the KeyValue that was created + */ + private KeyValue createKeyValue(byte[] key, byte[] cqBytes, boolean value, long timestamp) throws IOException { + // create new key value for put + return KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, cqBytes, + timestamp, value ? PDataType.TRUE_BYTES : PDataType.FALSE_BYTES); + } /** * Override the preAppend for checkAndPut and checkAndDelete, as we need the ability to http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index b3bec6e..35eb10d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.builder.IndexBuildManager; import org.apache.phoenix.hbase.index.builder.IndexBuilder; @@ -67,7 +66,6 @@ import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy; import org.apache.phoenix.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter; import org.apache.phoenix.trace.TracingCompat; import org.apache.phoenix.trace.util.NullSpan; -import org.apache.phoenix.trace.util.Tracing; import org.cloudera.htrace.Span; import org.cloudera.htrace.Trace; @@ -140,18 +138,10 @@ public class Indexer extends BaseRegionObserver { private static final int INDEX_WAL_COMPRESSION_MINIMUM_SUPPORTED_VERSION = VersionUtil .encodeVersion("0.94.9"); - /** - * Raw configuration, for tracing. Coprocessors generally will get a subset configuration (if - * they are on a per-table basis), so we need the raw one from the server, so we can get the - * actual configuration keys - */ - private Configuration rawConf; - @Override public void start(CoprocessorEnvironment e) throws IOException { try { final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; - this.rawConf = env.getRegionServerServices().getConfiguration(); String serverName = env.getRegionServerServices().getServerName().getServerName(); if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) { // make sure the right version <-> combinations are allowed. http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 1fabf97..93ada7b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -202,7 +202,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho public static final byte[] CURRENT_VALUE_BYTES = Bytes.toBytes(CURRENT_VALUE); public static final String START_WITH = "START_WITH"; public static final byte[] START_WITH_BYTES = Bytes.toBytes(START_WITH); - // MIN_VALUE, MAX_VALUE and CYCLE_FLAG were added in 3.0 + // MIN_VALUE, MAX_VALUE, CYCLE_FLAG and LIMIT_FLAG were added in 3.1/4.1 public static final String MIN_VALUE = "MIN_VALUE"; public static final byte[] MIN_VALUE_BYTES = Bytes.toBytes(MIN_VALUE); public static final String MAX_VALUE = "MAX_VALUE"; @@ -213,6 +213,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho public static final byte[] CACHE_SIZE_BYTES = Bytes.toBytes(CACHE_SIZE); public static final String CYCLE_FLAG = "CYCLE_FLAG"; public static final byte[] CYCLE_FLAG_BYTES = Bytes.toBytes(CYCLE_FLAG); + public static final String LIMIT_REACHED_FLAG = "LIMIT_REACHED_FLAG"; + public static final byte[] LIMIT_REACHED_FLAG_BYTES = Bytes.toBytes(LIMIT_REACHED_FLAG); public static final String KEY_SEQ = "KEY_SEQ"; public static final byte[] KEY_SEQ_BYTES = Bytes.toBytes(KEY_SEQ); public static final String SUPERTABLE_NAME = "SUPERTABLE_NAME"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index cdc7a2a..4dffc39 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -19,6 +19,7 @@ package org.apache.phoenix.query; import static com.google.common.io.Closeables.closeQuietly; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; @@ -1481,8 +1482,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement String newColumns = MIN_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", " + MAX_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", " - + CYCLE_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName(); - metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, + + CYCLE_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName() + ", " + + LIMIT_REACHED_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName(); + metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, newColumns); } } catch (SQLException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 22bc271..223abb6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -360,18 +360,29 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException { int i = 0; - for (SequenceKey key : sequenceKeys) { - SequenceInfo info = sequenceMap.get(key); - if (info == null) { - exceptions[i] = - new SequenceNotFoundException(key.getSchemaName(), key.getSequenceName()); - } else { - values[i] = info.sequenceValue; - info.sequenceValue = - SequenceUtil.getNextValue(key, info); - } - i++; - } + for (SequenceKey key : sequenceKeys) { + SequenceInfo info = sequenceMap.get(key); + if (info == null) { + exceptions[i] = new SequenceNotFoundException( + key.getSchemaName(), key.getSequenceName()); + } else { + boolean increaseSeq = info.incrementBy > 0; + if (info.limitReached) { + SQLExceptionCode code = increaseSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE + : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE; + exceptions[i] = new SQLExceptionInfo.Builder(code).build().buildException(); + } else { + values[i] = info.sequenceValue; + info.sequenceValue += info.incrementBy * info.cacheSize; + info.limitReached = SequenceUtil.checkIfLimitReached(info); + if (info.limitReached && info.cycle) { + info.sequenceValue = increaseSeq ? info.minValue : info.maxValue; + info.limitReached = false; + } + } + } + i++; + } i = 0; for (SQLException e : exceptions) { if (e != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index a27aae6..07f6612 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -42,6 +42,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE; @@ -229,7 +230,8 @@ public interface QueryConstants { // the following three columns were added in 3.1/4.1 MIN_VALUE + " BIGINT, \n" + MAX_VALUE + " BIGINT, \n" + - CYCLE_FLAG + " BOOLEAN \n" + + CYCLE_FLAG + " BOOLEAN, \n" + + LIMIT_REACHED_FLAG + " BOOLEAN \n" + " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" + HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + "\n"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java index 7c47a5f..4dff12c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java @@ -22,10 +22,10 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH_BYTES; import java.io.IOException; import java.sql.SQLException; @@ -66,19 +66,19 @@ public class Sequence { private static final KeyValue CURRENT_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, CURRENT_VALUE_BYTES); private static final KeyValue INCREMENT_BY_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, INCREMENT_BY_BYTES); private static final KeyValue CACHE_SIZE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, CACHE_SIZE_BYTES); - private static final KeyValue START_WITH_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, START_WITH_BYTES); private static final KeyValue MIN_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, MIN_VALUE_BYTES); private static final KeyValue MAX_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, MAX_VALUE_BYTES); private static final KeyValue CYCLE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, CYCLE_FLAG_BYTES); + private static final KeyValue LIMIT_REACHED_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, LIMIT_REACHED_FLAG_BYTES); private static final List<KeyValue> SEQUENCE_KV_COLUMNS = Arrays.<KeyValue>asList( CURRENT_VALUE_KV, INCREMENT_BY_KV, CACHE_SIZE_KV, - START_WITH_KV, - // the following three columns were added in 3.1/4.1 + // The following three columns were added in 3.1/4.1 MIN_VALUE_KV, MAX_VALUE_KV, - CYCLE_KV + CYCLE_KV, + LIMIT_REACHED_KV ); static { Collections.sort(SEQUENCE_KV_COLUMNS, KeyValue.COMPARATOR); @@ -87,12 +87,12 @@ public class Sequence { private static final int CURRENT_VALUE_INDEX = SEQUENCE_KV_COLUMNS.indexOf(CURRENT_VALUE_KV); private static final int INCREMENT_BY_INDEX = SEQUENCE_KV_COLUMNS.indexOf(INCREMENT_BY_KV); private static final int CACHE_SIZE_INDEX = SEQUENCE_KV_COLUMNS.indexOf(CACHE_SIZE_KV); - private static final int START_WITH_INDEX = SEQUENCE_KV_COLUMNS.indexOf(START_WITH_KV); private static final int MIN_VALUE_INDEX = SEQUENCE_KV_COLUMNS.indexOf(MIN_VALUE_KV); private static final int MAX_VALUE_INDEX = SEQUENCE_KV_COLUMNS.indexOf(MAX_VALUE_KV); private static final int CYCLE_INDEX = SEQUENCE_KV_COLUMNS.indexOf(CYCLE_KV); + private static final int LIMIT_REACHED_INDEX = SEQUENCE_KV_COLUMNS.indexOf(LIMIT_REACHED_KV); - private static final int NUM_SEQUENCE_KEY_VALUES = SEQUENCE_KV_COLUMNS.size(); + public static final int NUM_SEQUENCE_KEY_VALUES = SEQUENCE_KV_COLUMNS.size(); private static final EmptySequenceCacheException EMPTY_SEQUENCE_CACHE_EXCEPTION = new EmptySequenceCacheException(); private final SequenceKey key; @@ -159,7 +159,6 @@ public class Sequence { long returnValue = value.currentValue; if (factor != 0) { - --value.unusedValues; boolean overflowOrUnderflow=false; // advance currentValue while checking for overflow try { @@ -184,7 +183,7 @@ public class Sequence { if (value == null) { throw EMPTY_SEQUENCE_CACHE_EXCEPTION; } - if (value.unusedValues == 0) { + if (value.currentValue == value.nextValue) { if (action == ValueOp.VALIDATE_SEQUENCE) { return value.currentValue; } @@ -199,7 +198,7 @@ public class Sequence { } List<Append> appends = Lists.newArrayListWithExpectedSize(values.size()); for (SequenceValue value : values) { - if (value.isInitialized() && value.unusedValues>0) { + if (value.isInitialized() && value.currentValue != value.nextValue) { appends.add(newReturn(value)); } } @@ -211,7 +210,7 @@ public class Sequence { if (value == null) { throw EMPTY_SEQUENCE_CACHE_EXCEPTION; } - if (value.unusedValues==0) { + if (value.currentValue == value.nextValue) { throw EMPTY_SEQUENCE_CACHE_EXCEPTION; } return newReturn(value); @@ -222,11 +221,12 @@ public class Sequence { Append append = new Append(key); byte[] opBuf = new byte[] {(byte)MetaOp.RETURN_SEQUENCE.ordinal()}; append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, opBuf); - append.setAttribute(SequenceRegionObserver.CURRENT_VALUE_ATTRIB, PDataType.LONG.toBytes(value.startValue)); + append.setAttribute(SequenceRegionObserver.CURRENT_VALUE_ATTRIB, PDataType.LONG.toBytes(value.nextValue)); Map<byte[], List<Cell>> familyMap = append.getFamilyCellMap(); familyMap.put(PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, Arrays.<Cell>asList( - (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, value.timestamp, ByteUtil.EMPTY_BYTE_ARRAY), - (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.START_WITH_BYTES, value.timestamp, PDataType.LONG.toBytes(value.currentValue)) + (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, value.timestamp, PDataType.LONG.toBytes(value.currentValue)), + // set LIMIT_REACHED flag to false since we are returning unused sequence values + (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, value.timestamp, PDataType.FALSE_BYTES) )); return append; } @@ -300,12 +300,11 @@ public class Sequence { */ private static KeyValue getKeyValue(Result r, KeyValue kv, int cellIndex) { Cell[] cells = r.rawCells(); - // if the sequence row is from a previous version then MIN_VALUE, MAX_VALUE and CYCLE key values are not present, - // the sequence row has only four columns (START_VALUE, INCREMENT_BY, CACHE_SIZE and CURRENT_VALUE) and the order of the cells + // if the sequence row is from a previous version then MIN_VALUE, MAX_VALUE, CYCLE and LIMIT_REACHED key values are not present, + // the sequence row has only three columns (INCREMENT_BY, CACHE_SIZE and CURRENT_VALUE) and the order of the cells // in the array returned by rawCells() is not what what we expect so use getColumnLatestCell() to get the cell we want - Cell cell = cells.length != NUM_SEQUENCE_KEY_VALUES ? - r.getColumnLatestCell(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()) - : (cells[cellIndex]); + Cell cell = cells.length == NUM_SEQUENCE_KEY_VALUES ? cells[cellIndex] : + r.getColumnLatestCell(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()); return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cell); } @@ -325,10 +324,6 @@ public class Sequence { return getKeyValue(r, CACHE_SIZE_KV, CACHE_SIZE_INDEX); } - public static KeyValue getStartValueKV(Result r) { - return getKeyValue(r, START_WITH_KV, START_WITH_INDEX); - } - public static KeyValue getMinValueKV(Result r) { return getKeyValue(r, MIN_VALUE_KV, MIN_VALUE_INDEX); } @@ -340,6 +335,10 @@ public class Sequence { public static KeyValue getCycleKV(Result r) { return getKeyValue(r, CYCLE_KV, CYCLE_INDEX); } + + public static KeyValue getLimitReachedKV(Result r) { + return getKeyValue(r, LIMIT_REACHED_KV, LIMIT_REACHED_INDEX); + } public static void replaceCurrentValueKV(List<Cell> kvs, KeyValue currentValueKV) { kvs.set(CURRENT_VALUE_INDEX, currentValueKV); @@ -356,35 +355,36 @@ public class Sequence { public static void replaceCycleValueKV(List<Cell> kvs, KeyValue cycleValueKV) { kvs.set(CYCLE_INDEX, cycleValueKV); } + public static void replaceLimitReachedKV(List<Cell> kvs, KeyValue limitReachedKV) { + kvs.set(LIMIT_REACHED_INDEX, limitReachedKV); + } /** - * Returns a Cell[] for the result row. Handles empty MIN_VALUE, MAX_VALUE and CYCLE - * KeyValues if the sequence row is from a previous version + * Returns the KeyValues of r if it contains the expected number of KeyValues, + * else returns a list of KeyValues corresponding to SEQUENCE_KV_COLUMNS */ - public static List<Cell> getCells(Result r) { - // if the sequence row is from a previous version - if (r.rawCells().length == NUM_SEQUENCE_KEY_VALUES ) + public static List<Cell> getCells(Result r, int numKVs) { + // if the sequence row is from a previous version + if (r.rawCells().length == numKVs ) return Lists.newArrayList(r.rawCells()); - // else we need to handle missing MIN_VALUE, MAX_VALUE and CYCLE KeyValues + // else we need to handle missing MIN_VALUE, MAX_VALUE, CYCLE and LIMIT_REACHED KeyValues List<Cell> cellList = Lists.newArrayListWithCapacity(NUM_SEQUENCE_KEY_VALUES); for (KeyValue kv : SEQUENCE_KV_COLUMNS) { cellList.add(getKeyValue(r,kv)); } return cellList; - } + } private static final class SequenceValue { public final long incrementBy; public final long timestamp; + public final long cacheSize; public long currentValue; - // start value of the current batch - public long startValue; + public long nextValue; public long minValue; public long maxValue; public boolean cycle; - // number of values left in current batch - public long unusedValues; public boolean isDeleted; public boolean limitReached; @@ -400,6 +400,7 @@ public class Sequence { this.isDeleted = isDeleted; this.incrementBy = 0; this.limitReached = false; + this.cacheSize = 0; } public boolean isInitialized() { @@ -418,16 +419,14 @@ public class Sequence { KeyValue maxValueKV = getMaxValueKV(r); KeyValue cycleKV = getCycleKV(r); this.timestamp = currentValueKV.getTimestamp(); - this.currentValue = PDataType.LONG.getCodec().decodeLong(currentValueKV.getValueArray(), currentValueKV.getValueOffset(), SortOrder.getDefault()); + this.nextValue = PDataType.LONG.getCodec().decodeLong(currentValueKV.getValueArray(), currentValueKV.getValueOffset(), SortOrder.getDefault()); this.incrementBy = PDataType.LONG.getCodec().decodeLong(incrementByKV.getValueArray(), incrementByKV.getValueOffset(), SortOrder.getDefault()); - this.unusedValues = PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getValueArray(), cacheSizeKV.getValueOffset(), SortOrder.getDefault()); + this.cacheSize = PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getValueArray(), cacheSizeKV.getValueOffset(), SortOrder.getDefault()); this.minValue = PDataType.LONG.getCodec().decodeLong(minValueKV.getValueArray(), minValueKV.getValueOffset(), SortOrder.getDefault()); this.maxValue = PDataType.LONG.getCodec().decodeLong(maxValueKV.getValueArray(), maxValueKV.getValueOffset(), SortOrder.getDefault()); this.cycle = (Boolean)PDataType.BOOLEAN.toObject(cycleKV.getValueArray(), cycleKV.getValueOffset(), cycleKV.getValueLength()); this.limitReached = false; - // store the start value of this batch of sequence values, so that it can be used to - // determine if we can return unused sequence values when we close the connection - this.startValue = this.currentValue; + currentValue = nextValue - incrementBy * cacheSize; } } @@ -467,14 +466,15 @@ public class Sequence { Map<byte[], List<Cell>> familyMap = append.getFamilyCellMap(); byte[] startWithBuf = PDataType.LONG.toBytes(startWith); familyMap.put(PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, Arrays.<Cell>asList( - (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timestamp, ByteUtil.EMPTY_BYTE_ARRAY), - (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, timestamp, ByteUtil.EMPTY_BYTE_ARRAY), - (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.START_WITH_BYTES, timestamp, startWithBuf), - (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.INCREMENT_BY_BYTES, timestamp, PDataType.LONG.toBytes(incrementBy)), - (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CACHE_SIZE_BYTES, timestamp, PDataType.LONG.toBytes(cacheSize)), - (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_VALUE_BYTES, timestamp, PDataType.LONG.toBytes(minValue)), - (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_VALUE_BYTES, timestamp, PDataType.LONG.toBytes(maxValue)), - (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, timestamp, PDataType.BOOLEAN.toBytes(cycle)) + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timestamp, ByteUtil.EMPTY_BYTE_ARRAY), + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, timestamp, startWithBuf), + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.START_WITH_BYTES, timestamp, startWithBuf), + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.INCREMENT_BY_BYTES, timestamp, PDataType.LONG.toBytes(incrementBy)), + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CACHE_SIZE_BYTES, timestamp, PDataType.LONG.toBytes(cacheSize)), + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_VALUE_BYTES, timestamp, PDataType.LONG.toBytes(minValue)), + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_VALUE_BYTES, timestamp, PDataType.LONG.toBytes(maxValue)), + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, timestamp, PDataType.BOOLEAN.toBytes(cycle)), + KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, timestamp, PDataType.FALSE_BYTES) )); return append; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java index 26ea132..be4455b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java @@ -17,6 +17,7 @@ public class SequenceInfo { public final long maxValue; public final long cacheSize; public final boolean cycle; + public boolean limitReached; public SequenceInfo(long sequenceValue, long incrementBy, long minValue, long maxValue, long cacheSize, boolean cycle) { this.sequenceValue = sequenceValue; @@ -25,5 +26,6 @@ public class SequenceInfo { this.maxValue = maxValue; this.cacheSize = cacheSize; this.cycle = cycle; + this.limitReached = false; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java index b6d3c67..f29e448 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java @@ -18,13 +18,13 @@ package org.apache.phoenix.trace; import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION; -import static org.apache.phoenix.metrics.MetricInfo.TAG; import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION; import static org.apache.phoenix.metrics.MetricInfo.END; import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME; import static org.apache.phoenix.metrics.MetricInfo.PARENT; import static org.apache.phoenix.metrics.MetricInfo.SPAN; import static org.apache.phoenix.metrics.MetricInfo.START; +import static org.apache.phoenix.metrics.MetricInfo.TAG; import static org.apache.phoenix.metrics.MetricInfo.TRACE; import java.sql.Connection; @@ -179,7 +179,7 @@ public class PhoenixTableMetricsWriter implements MetricsWriter { for (PhoenixAbstractMetric metric : record.metrics()) { // name of the metric is also the column name to which we write keys.add(MetricInfo.getColumnName(metric.getName())); - values.add((Long) metric.value()); + values.add(metric.value()); } // get the tags out so we can set them later (otherwise, need to be a single value) http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java index b8dc32b..3850ca9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java @@ -77,17 +77,9 @@ public class KeyValueUtil { value, valueOffset, valueLength); } - public static KeyValue newKeyValue(byte[] key, KeyValue kv, byte[] value) { - return newKeyValue(key, 0, key.length, - kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), - kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), - kv.getTimestamp(), - value, 0, value.length); - } - - public static KeyValue newKeyValue(byte[] key, byte[] cf, byte[] cq, long ts, byte[] value) { - return newKeyValue(key,cf,cq,ts,value,0,value.length); - } + public static KeyValue newKeyValue(byte[] key, byte[] cf, byte[] cq, long ts, byte[] value) { + return newKeyValue(key, cf, cq, ts, value, 0, value.length); + } /** * Binary search for latest column value without allocating memory in the process http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java index f17d721..f97d565 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java @@ -15,7 +15,6 @@ import java.sql.SQLException; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.schema.SequenceInfo; -import org.apache.phoenix.schema.SequenceKey; import com.google.common.math.LongMath; @@ -28,36 +27,28 @@ public class SequenceUtil { * Returns the nextValue of a sequence * @throws SQLException if cycle is false and the sequence limit has been reached */ - public static long getNextValue(long currentValue, long minValue, long maxValue, - long incrementBy, long cacheSize, boolean cycle) throws SQLException { + public static boolean checkIfLimitReached(long currentValue, long minValue, long maxValue, + long incrementBy, long cacheSize) throws SQLException { long nextValue = 0; boolean increasingSeq = incrementBy > 0 ? true : false; - boolean overflowOrUnderflow = false; // advance currentValue while checking for overflow try { long incrementValue = LongMath.checkedMultiply(incrementBy, cacheSize); nextValue = LongMath.checkedAdd(currentValue, incrementValue); } catch (ArithmeticException e) { - overflowOrUnderflow = true; + return true; } - // check if overflow or limit was reached - if (overflowOrUnderflow || (increasingSeq && nextValue > maxValue) - || (!increasingSeq && nextValue < minValue)) { - if (cycle) { - nextValue = increasingSeq ? minValue : maxValue; - } else { - SQLExceptionCode code = - increasingSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE - : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE; - throw new SQLExceptionInfo.Builder(code).build().buildException(); - } + // check if limit was reached + if ((increasingSeq && nextValue > maxValue) + || (!increasingSeq && nextValue < minValue)) { + return true; } - return nextValue; + return false; } - public static long getNextValue(SequenceKey key, SequenceInfo info) throws SQLException { - return getNextValue(info.sequenceValue, info.minValue, info.maxValue, info.incrementBy, info.cacheSize, info.cycle); + public static boolean checkIfLimitReached(SequenceInfo info) throws SQLException { + return checkIfLimitReached(info.sequenceValue, info.minValue, info.maxValue, info.incrementBy, info.cacheSize); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java index bb1ef49..f25a213 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java @@ -10,11 +10,11 @@ */ package org.apache.phoenix.util; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.sql.SQLException; -import org.apache.phoenix.exception.SQLExceptionCode; import org.junit.Test; public class SequenceUtilTest { @@ -25,93 +25,41 @@ public class SequenceUtilTest { @Test public void testAscendingNextValueWithinLimit() throws SQLException { - assertEquals(9, SequenceUtil.getNextValue(5, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, - CACHE_SIZE, false)); + assertFalse(SequenceUtil.checkIfLimitReached(5, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE)); } @Test public void testAscendingNextValueReachLimit() throws SQLException { - assertEquals(MAX_VALUE, SequenceUtil.getNextValue(6, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, - CACHE_SIZE, false)); + assertFalse(SequenceUtil.checkIfLimitReached(6, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE)); } @Test - public void testAscendingNextValueGreaterThanMaxValueNoCycle() throws SQLException { - try { - SequenceUtil.getNextValue(MAX_VALUE, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE, - false); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(), - e.getErrorCode()); - } - } - - @Test - public void testAscendingNextValueGreaterThanMaxValueCycle() throws SQLException { - assertEquals(MIN_VALUE, SequenceUtil.getNextValue(MAX_VALUE, MIN_VALUE, MAX_VALUE, - 2/* incrementBy */, CACHE_SIZE, true)); - } - - @Test - public void testAscendingOverflowNoCycle() throws SQLException { - try { - SequenceUtil.getNextValue(Long.MAX_VALUE, 0, Long.MAX_VALUE, 1/* incrementBy */, CACHE_SIZE, - false); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(), - e.getErrorCode()); - } + public void testAscendingNextValueGreaterThanMaxValue() throws SQLException { + assertTrue(SequenceUtil.checkIfLimitReached(MAX_VALUE, MIN_VALUE, MAX_VALUE, 2/* incrementBy */, CACHE_SIZE)); } @Test - public void testAscendingOverflowCycle() throws SQLException { - assertEquals(0, SequenceUtil.getNextValue(Long.MAX_VALUE, 0, Long.MAX_VALUE, - 1/* incrementBy */, CACHE_SIZE, true)); + public void testAscendingOverflow() throws SQLException { + assertTrue(SequenceUtil.checkIfLimitReached(Long.MAX_VALUE, 0, Long.MAX_VALUE, 1/* incrementBy */, CACHE_SIZE)); } @Test public void testDescendingNextValueWithinLimit() throws SQLException { - assertEquals(2, SequenceUtil.getNextValue(6, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, - CACHE_SIZE, false)); + assertFalse(SequenceUtil.checkIfLimitReached(6, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE)); } @Test public void testDescendingNextValueReachLimit() throws SQLException { - assertEquals(MIN_VALUE, SequenceUtil.getNextValue(5, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, - CACHE_SIZE, false)); + assertFalse(SequenceUtil.checkIfLimitReached(5, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE)); } @Test - public void testDescendingNextValueLessThanMinValueNoCycle() throws SQLException { - try { - SequenceUtil.getNextValue(1, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE, - false); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE.getErrorCode(), - e.getErrorCode()); - } - } - - @Test - public void testDescendingNextValueLessThanMinValueCycle() throws SQLException { - assertEquals(MAX_VALUE, SequenceUtil.getNextValue(2, MIN_VALUE, MAX_VALUE, - -2/* incrementBy */, CACHE_SIZE, true)); - } - - @Test - public void testDescendingOverflowNoCycle() throws SQLException { - try { - SequenceUtil.getNextValue(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/* incrementBy */, CACHE_SIZE, - false); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE.getErrorCode(), - e.getErrorCode()); - } + public void testDescendingNextValueLessThanMinValue() throws SQLException { + assertTrue(SequenceUtil.checkIfLimitReached(2, MIN_VALUE, MAX_VALUE, -2/* incrementBy */, CACHE_SIZE)); } @Test public void testDescendingOverflowCycle() throws SQLException { - assertEquals(0, SequenceUtil.getNextValue(Long.MIN_VALUE, Long.MIN_VALUE, 0, - -1/* incrementBy */, CACHE_SIZE, true)); + assertTrue(SequenceUtil.checkIfLimitReached(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/* incrementBy */, CACHE_SIZE)); } }
