Repository: phoenix
Updated Branches:
  refs/heads/4.0 d372c6591 -> aba5ea906


PHOENIX-1016 Support MINVALUE, MAXVALUE, and CYCLE options in CREATE SEQUENCE 
(Thomas D'Silva)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/aba5ea90
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/aba5ea90
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/aba5ea90

Branch: refs/heads/4.0
Commit: aba5ea906424238664ef61dbb04e39de5896cc43
Parents: d372c65
Author: James Taylor <[email protected]>
Authored: Mon Jul 28 20:28:52 2014 -0700
Committer: James Taylor <[email protected]>
Committed: Mon Jul 28 20:28:52 2014 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/SequenceIT.java  |  19 +-
 .../phoenix/coprocessor/MetaDataProtocol.java   |   2 +-
 .../coprocessor/SequenceRegionObserver.java     | 226 +++++++++++--------
 .../org/apache/phoenix/hbase/index/Indexer.java |  10 -
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   4 +-
 .../query/ConnectionQueryServicesImpl.java      |   6 +-
 .../query/ConnectionlessQueryServicesImpl.java  |  35 ++-
 .../apache/phoenix/query/QueryConstants.java    |   4 +-
 .../org/apache/phoenix/schema/Sequence.java     |  94 ++++----
 .../org/apache/phoenix/schema/SequenceInfo.java |   2 +
 .../trace/PhoenixTableMetricsWriter.java        |   4 +-
 .../org/apache/phoenix/util/KeyValueUtil.java   |  14 +-
 .../org/apache/phoenix/util/SequenceUtil.java   |  29 +--
 .../apache/phoenix/util/SequenceUtilTest.java   |  78 ++-----
 14 files changed, 259 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
index 0208d17..3e0301c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SequenceUtil;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -55,7 +56,7 @@ import com.google.common.collect.Maps;
 public class SequenceIT extends BaseClientManagedTimeIT {
     private static final String NEXT_VAL_SQL = "SELECT NEXT VALUE FOR foo.bar 
FROM SYSTEM.\"SEQUENCE\"";
     private static final long BATCH_SIZE = 3;
-    
+   
     private Connection conn;
     
     @BeforeClass
@@ -69,6 +70,13 @@ public class SequenceIT extends BaseClientManagedTimeIT {
         setUpTestDriver(getUrl(), new 
ReadOnlyProps(props.entrySet().iterator()));
     }
     
+    @After
+    public void tearDown() throws Exception {
+        // close any open connection between tests, so that connections are 
not leaked
+       if (conn != null) {
+               conn.close();
+       }
+    }
 
        @Test
        public void testSystemTable() throws Exception {                
@@ -114,7 +122,7 @@ public class SequenceIT extends BaseClientManagedTimeIT {
         assertTrue(rs.next());
         assertEquals("ALPHA", rs.getString("sequence_schema"));
         assertEquals("OMEGA", rs.getString("sequence_name"));
-        assertEquals(null, rs.getBytes("current_value"));
+        assertEquals(2, rs.getInt("current_value"));
         assertEquals(4, rs.getInt("increment_by"));
         assertFalse(rs.next());
        }
@@ -152,7 +160,7 @@ public class SequenceIT extends BaseClientManagedTimeIT {
         assertTrue(rs.next());
         assertEquals("ALPHA", rs.getString("sequence_schema"));
         assertEquals("OMEGA", rs.getString("sequence_name"));
-        assertEquals(null, rs.getBytes("current_value"));
+        assertEquals(2, rs.getInt("current_value"));
         assertEquals(4, rs.getInt("increment_by"));
         assertFalse(rs.next());
 
@@ -208,7 +216,7 @@ public class SequenceIT extends BaseClientManagedTimeIT {
                             "SELECT start_with, current_value, increment_by, 
cache_size, min_value, max_value, cycle_flag, sequence_schema, sequence_name 
FROM SYSTEM.\"SEQUENCE\"");
         assertTrue(rs.next());
         assertEquals(2, rs.getLong("start_with"));
-        assertEquals(null, rs.getBytes("current_value"));
+        assertEquals(2, rs.getInt("current_value"));
         assertEquals(3, rs.getLong("increment_by"));
         assertEquals(5, rs.getLong("cache_size"));
         assertEquals(0, rs.getLong("min_value"));
@@ -654,7 +662,7 @@ public class SequenceIT extends BaseClientManagedTimeIT {
         rs = conn.createStatement().executeQuery("SELECT sequence_name, 
current_value FROM SYSTEM.\"SEQUENCE\" WHERE sequence_name='BAR'");
         assertTrue(rs.next());
         assertEquals("BAR", rs.getString(1));
-        assertEquals(null, rs.getBytes(2));
+        assertEquals(1, rs.getInt(2));
         conn.close();
         conn2.close();
 
@@ -690,6 +698,7 @@ public class SequenceIT extends BaseClientManagedTimeIT {
         assertEquals(4, rs.getInt(1));
     }
     
+    // if nextConnection() is not used to get to get a connection, make sure 
you call .close() so that connections are not leaked
     private void nextConnection() throws Exception {
         if (conn != null) conn.close();
         long ts = nextTimestamp();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 0517895..8e61f1b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -60,7 +60,7 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
     public static final long MIN_TABLE_TIMESTAMP = 0;
     // Each time a column is added to the SYSTEM.CATALOG, this should be 
increased.
     // Adding INDEX_TYPE column for local indexing
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP 
+ 2;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP 
+ 3;
     public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000;
 
     // TODO: pare this down to minimum, as we don't need duplicates for both 
table and column errors, nor should we need

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index f537bd2..9754d00 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -23,6 +23,8 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -54,6 +56,8 @@ import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SequenceUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
@@ -127,8 +131,7 @@ public class SequenceRegionObserver extends 
BaseRegionObserver {
                 for (Map.Entry<byte[], List<Cell>> entry : 
increment.getFamilyCellMap().entrySet()) {
                     byte[] cf = entry.getKey();
                     for (Cell cq : entry.getValue()) {
-                       long value = 
PDataType.LONG.getCodec().decodeLong(cq.getValueArray(), cq.getValueOffset(), 
-                                       SortOrder.getDefault());
+                       long value = Bytes.toLong(cq.getValueArray(), 
cq.getValueOffset());
                         get.addColumn(cf, CellUtil.cloneQualifier(cq));
                         validateOnly &= 
(Sequence.ValueOp.VALIDATE_SEQUENCE.ordinal() == value);
                     }
@@ -141,109 +144,118 @@ public class SequenceRegionObserver extends 
BaseRegionObserver {
                     return result;
                 }
                 
+                KeyValue currentValueKV = Sequence.getCurrentValueKV(result);
                 KeyValue incrementByKV = Sequence.getIncrementByKV(result);
-                KeyValue currentValueKV = Sequence.getCurrentValueKV(result);  
             
-                KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result);        
-                KeyValue cycleKV = Sequence.getCycleKV(result);
-                KeyValue minValueKV = Sequence.getMinValueKV(result);
-                KeyValue maxValueKV = Sequence.getMaxValueKV(result);
+                KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result);
+                
+                long currentValue = 
PDataType.LONG.getCodec().decodeLong(currentValueKV.getValueArray(), 
currentValueKV.getValueOffset(), SortOrder.getDefault());
+                long incrementBy = 
PDataType.LONG.getCodec().decodeLong(incrementByKV.getValueArray(), 
incrementByKV.getValueOffset(), SortOrder.getDefault());
+                long cacheSize = 
PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getValueArray(), 
cacheSizeKV.getValueOffset(), SortOrder.getDefault());
                 
                 // Hold timestamp constant for sequences, so that clients 
always only see the latest
                 // value regardless of when they connect.
-                Put put = new Put(row, currentValueKV.getTimestamp());
-                
-                // create a copy of the key values, used for the new Return
-                List<Cell> newkvs = Sequence.getCells(result);
+                long timestamp = currentValueKV.getTimestamp();
+                               Put put = new Put(row, timestamp);
                 
-                long incrementBy =
-                        
PDataType.LONG.getCodec().decodeLong(incrementByKV.getValueArray(),
-                            incrementByKV.getValueOffset(), 
SortOrder.getDefault());
-                     
-                long cacheSize =
-                        
PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getValueArray(),
-                            cacheSizeKV.getValueOffset(), 
SortOrder.getDefault());
+                               int numIncrementKVs = 
increment.getFamilyCellMap().get(PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES).size();
+                // creates the list of KeyValues used for the Result that will 
be returned
+                List<Cell> cells = Sequence.getCells(result, numIncrementKVs);
                 
-                // if the minValue, maxValue, or cycle is null this sequence 
has been upgraded from
-                // a lower version. Set minValue, maxValue and cycle to 
Long.MIN_VALUE, Long.MAX_VALUE and true 
-                // respectively in order to maintain existing behavior and 
also update the KeyValues on the server 
-                long minValue;
-                if (minValueKV == null) {
-                    minValue = Long.MIN_VALUE;
-                    // create new key value for put
-                    byte[] newMinValueBuffer = new 
byte[PDataType.LONG.getByteSize()];
-                    PDataType.LONG.getCodec().encodeLong(minValue, 
newMinValueBuffer, 0);
-                    KeyValue newMinValueKV = KeyValueUtil.newKeyValue(row, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES,
-                                PhoenixDatabaseMetaData.MIN_VALUE_BYTES, 
currentValueKV.getTimestamp(), newMinValueBuffer);
-                    put.add(newMinValueKV);
-                    // update key value in returned Result
-                    Sequence.replaceMinValueKV(newkvs, newMinValueKV);
-                }
-                else {
-                    minValue = 
PDataType.LONG.getCodec().decodeLong(minValueKV.getValueArray(),
-                                minValueKV.getValueOffset(), 
SortOrder.getDefault());
-                }           
-                long maxValue;
-                if (maxValueKV == null) {
-                    maxValue = Long.MAX_VALUE;
-                    // create new key value for put
-                    byte[] newMaxValueBuffer = new 
byte[PDataType.LONG.getByteSize()];
-                    PDataType.LONG.getCodec().encodeLong(maxValue, 
newMaxValueBuffer, 0);
-                    KeyValue newMaxValueKV = KeyValueUtil.newKeyValue(row, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES,
-                        PhoenixDatabaseMetaData.MAX_VALUE_BYTES, 
currentValueKV.getTimestamp(), newMaxValueBuffer);
-                    put.add(newMaxValueKV);
-                    // update key value in returned Result
-                    Sequence.replaceMaxValueKV(newkvs, newMaxValueKV);
-                }
-                else {
-                    maxValue =  
PDataType.LONG.getCodec().decodeLong(maxValueKV.getValueArray(),
-                            maxValueKV.getValueOffset(), 
SortOrder.getDefault());
-                }
-                boolean cycle;
-                if (cycleKV == null) {
-                    cycle = false;
-                    // create new key value for put
-                    KeyValue newCycleKV = KeyValueUtil.newKeyValue(row, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES,
-                        PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, 
currentValueKV.getTimestamp(), PDataType.FALSE_BYTES);
-                    put.add(newCycleKV);
-                    // update key value in returned Result
-                    Sequence.replaceCycleValueKV(newkvs, newCycleKV);
+                //if client is 3.0/4.0 preserve the old behavior (older 
clients won't have newer columns present in the increment)
+                if (numIncrementKVs != Sequence.NUM_SEQUENCE_KEY_VALUES) {
+                       currentValue += incrementBy * cacheSize;
+                    // Hold timestamp constant for sequences, so that clients 
always only see the latest value
+                    // regardless of when they connect.
+                    KeyValue newCurrentValueKV = createKeyValue(row, 
PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp);
+                    put.add(newCurrentValueKV);
+                    Sequence.replaceCurrentValueKV(cells, newCurrentValueKV);
                 }
                 else {
-                    cycle = (Boolean) 
PDataType.BOOLEAN.toObject(cycleKV.getValueArray(),
-                            cycleKV.getValueOffset(), 
cycleKV.getValueLength());
+                       KeyValue cycleKV = Sequence.getCycleKV(result);
+                       KeyValue limitReachedKV = 
Sequence.getLimitReachedKV(result);
+                       KeyValue minValueKV = Sequence.getMinValueKV(result);
+                       KeyValue maxValueKV = Sequence.getMaxValueKV(result);
+                       
+                       boolean increasingSeq = incrementBy > 0 ? true : false;
+                       
+                       // if the minValue, maxValue, cycle and limitReached is 
null this sequence has been upgraded from
+                       // a lower version. Set minValue, maxValue, cycle and 
limitReached to Long.MIN_VALUE, Long.MAX_VALUE, true and false
+                       // respectively in order to maintain existing behavior 
and also update the KeyValues on the server 
+                       boolean limitReached;
+                       if (limitReachedKV == null) {
+                               limitReached = false;
+                               KeyValue newLimitReachedKV = 
createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, 
limitReached, timestamp);
+                               put.add(newLimitReachedKV);
+                               Sequence.replaceLimitReachedKV(cells, 
newLimitReachedKV);
+                       }
+                       else {
+                               limitReached = (Boolean) 
PDataType.BOOLEAN.toObject(limitReachedKV.getValueArray(),
+                                               
limitReachedKV.getValueOffset(), limitReachedKV.getValueLength());
+                       }
+                       long minValue;
+                       if (minValueKV == null) {
+                           minValue = Long.MIN_VALUE;
+                           KeyValue newMinValueKV = createKeyValue(row, 
PhoenixDatabaseMetaData.MIN_VALUE_BYTES, minValue, timestamp);
+                           put.add(newMinValueKV);
+                           Sequence.replaceMinValueKV(cells, newMinValueKV);
+                       }
+                       else {
+                           minValue = 
PDataType.LONG.getCodec().decodeLong(minValueKV.getValueArray(),
+                                       minValueKV.getValueOffset(), 
SortOrder.getDefault());
+                       }           
+                       long maxValue;
+                       if (maxValueKV == null) {
+                           maxValue = Long.MAX_VALUE;
+                           KeyValue newMaxValueKV = createKeyValue(row, 
PhoenixDatabaseMetaData.MAX_VALUE_BYTES, maxValue, timestamp);
+                           put.add(newMaxValueKV);
+                           Sequence.replaceMaxValueKV(cells, newMaxValueKV);
+                       }
+                       else {
+                           maxValue =  
PDataType.LONG.getCodec().decodeLong(maxValueKV.getValueArray(),
+                                   maxValueKV.getValueOffset(), 
SortOrder.getDefault());
+                       }
+                       boolean cycle;
+                       if (cycleKV == null) {
+                           cycle = false;
+                           KeyValue newCycleKV = createKeyValue(row, 
PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, cycle, timestamp);
+                           put.add(newCycleKV);
+                           Sequence.replaceCycleValueKV(cells, newCycleKV);
+                       }
+                       else {
+                           cycle = (Boolean) 
PDataType.BOOLEAN.toObject(cycleKV.getValueArray(),
+                                   cycleKV.getValueOffset(), 
cycleKV.getValueLength());
+                       }
+                       
+                       // return if we have run out of sequence values 
+                                       if (limitReached) {
+                                               if (cycle) {
+                                                       // reset currentValue 
of the Sequence row to minValue/maxValue
+                                                       currentValue = 
increasingSeq ? minValue : maxValue;
+                                               }
+                                               else {
+                                                       SQLExceptionCode code = 
increasingSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE
+                                                                       : 
SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE;
+                                                       return 
getErrorResult(row, maxTimestamp, code.getErrorCode());
+                                               }
+                                       }
+                       
+                       // check if the limit was reached
+                                       limitReached = 
SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, 
cacheSize);
+                       // update currentValue
+                                       currentValue += incrementBy * cacheSize;
+                                       // update the currentValue of the 
Result row
+                                       KeyValue newCurrentValueKV = 
createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, 
timestamp);
+                           Sequence.replaceCurrentValueKV(cells, 
newCurrentValueKV);
+                           put.add(newCurrentValueKV);
+                                       // set the LIMIT_REACHED column to 
true, so that no new values can be used
+                                       KeyValue newLimitReachedKV = 
createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, 
limitReached, timestamp);
+                           put.add(newLimitReachedKV);
                 }
-                long currentValue;
-                // initialize current value to start value
-                if (currentValueKV.getValueLength()==0) {
-                    KeyValue startValueKV = Sequence.getStartValueKV(result);
-                    currentValue =
-                            
PDataType.LONG.getCodec().decodeLong(startValueKV.getValueArray(),
-                                startValueKV.getValueOffset(), 
SortOrder.getDefault());
-                }
-                else {
-                    currentValue =
-                            
PDataType.LONG.getCodec().decodeLong(currentValueKV.getValueArray(),
-                                currentValueKV.getValueOffset(), 
SortOrder.getDefault());      
-                    try {
-                        // set currentValue to nextValue
-                        currentValue =
-                                SequenceUtil.getNextValue(currentValue, 
minValue, maxValue,
-                                    incrementBy, cacheSize, cycle);
-                    } catch (SQLException sqlE) {
-                        return getErrorResult(row, maxTimestamp, 
sqlE.getErrorCode());
-                    }
-                }
-                byte[] newCurrentValueBuffer = new 
byte[PDataType.LONG.getByteSize()];
-                PDataType.LONG.getCodec().encodeLong(currentValue, 
newCurrentValueBuffer, 0);
-                KeyValue newCurrentValueKV = KeyValueUtil.newKeyValue(row, 
currentValueKV, newCurrentValueBuffer);
-                put.add(newCurrentValueKV);
-                Sequence.replaceCurrentValueKV(newkvs, newCurrentValueKV);
-                
                 // update the KeyValues on the server
                 Mutation[] mutations = new Mutation[]{put};
                 region.batchMutate(mutations);
                 // return a Result with the updated KeyValues
-                return Result.create(newkvs);
+                return Result.create(cells);
             } finally {
                 region.releaseRowLocks(locks);
             }
@@ -254,6 +266,36 @@ public class SequenceRegionObserver extends 
BaseRegionObserver {
             region.closeRegionOperation();
         }
     }
+    
+       /**
+        * Creates a new KeyValue for a long value
+        * 
+        * @param key
+        *            key used while creating KeyValue
+        * @param cqBytes
+        *            column qualifier of KeyValue
+        * @return return the KeyValue that was created
+        */
+       KeyValue createKeyValue(byte[] key, byte[] cqBytes, long value, long 
timestamp) {
+               byte[] valueBuffer = new byte[PDataType.LONG.getByteSize()];
+               PDataType.LONG.getCodec().encodeLong(value, valueBuffer, 0);
+               return KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, cqBytes, timestamp, valueBuffer);
+       }
+    
+       /**
+        * Creates a new KeyValue for a boolean value and adds it to the given 
put
+        * 
+        * @param key
+        *            key used while creating KeyValue
+        * @param cqBytes
+        *            column qualifier of KeyValue
+        * @return return the KeyValue that was created
+        */
+       private KeyValue createKeyValue(byte[] key, byte[] cqBytes, boolean 
value, long timestamp) throws IOException {
+               // create new key value for put
+               return KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, cqBytes, 
+                               timestamp, value ? PDataType.TRUE_BYTES : 
PDataType.FALSE_BYTES);
+       }
 
     /**
      * Override the preAppend for checkAndPut and checkAndDelete, as we need 
the ability to

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index b3bec6e..35eb10d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
 import org.apache.phoenix.hbase.index.builder.IndexBuilder;
@@ -67,7 +66,6 @@ import 
org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
 import 
org.apache.phoenix.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter;
 import org.apache.phoenix.trace.TracingCompat;
 import org.apache.phoenix.trace.util.NullSpan;
-import org.apache.phoenix.trace.util.Tracing;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.Trace;
 
@@ -140,18 +138,10 @@ public class Indexer extends BaseRegionObserver {
     private static final int INDEX_WAL_COMPRESSION_MINIMUM_SUPPORTED_VERSION = 
VersionUtil
             .encodeVersion("0.94.9");
 
-    /**
-     * Raw configuration, for tracing. Coprocessors generally will get a 
subset configuration (if
-     * they are on a per-table basis), so we need the raw one from the server, 
so we can get the
-     * actual configuration keys
-     */
-    private Configuration rawConf;
-
   @Override
   public void start(CoprocessorEnvironment e) throws IOException {
       try {
         final RegionCoprocessorEnvironment env = 
(RegionCoprocessorEnvironment) e;
-            this.rawConf = env.getRegionServerServices().getConfiguration();
         String serverName = 
env.getRegionServerServices().getServerName().getServerName();
         if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) {
           // make sure the right version <-> combinations are allowed.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 1fabf97..93ada7b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -202,7 +202,7 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData, org.apache.pho
     public static final byte[] CURRENT_VALUE_BYTES = 
Bytes.toBytes(CURRENT_VALUE);
     public static final String START_WITH = "START_WITH";
     public static final byte[] START_WITH_BYTES = Bytes.toBytes(START_WITH);
-    // MIN_VALUE, MAX_VALUE and CYCLE_FLAG were added in 3.0
+    // MIN_VALUE, MAX_VALUE, CYCLE_FLAG and LIMIT_FLAG were added in 3.1/4.1
     public static final String MIN_VALUE = "MIN_VALUE";
     public static final byte[] MIN_VALUE_BYTES = Bytes.toBytes(MIN_VALUE);
     public static final String MAX_VALUE = "MAX_VALUE";
@@ -213,6 +213,8 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData, org.apache.pho
     public static final byte[] CACHE_SIZE_BYTES = Bytes.toBytes(CACHE_SIZE);
     public static final String CYCLE_FLAG = "CYCLE_FLAG";
     public static final byte[] CYCLE_FLAG_BYTES = Bytes.toBytes(CYCLE_FLAG);
+    public static final String LIMIT_REACHED_FLAG = "LIMIT_REACHED_FLAG";
+    public static final byte[] LIMIT_REACHED_FLAG_BYTES = 
Bytes.toBytes(LIMIT_REACHED_FLAG);
     public static final String KEY_SEQ = "KEY_SEQ";
     public static final byte[] KEY_SEQ_BYTES = Bytes.toBytes(KEY_SEQ);
     public static final String SUPERTABLE_NAME = "SUPERTABLE_NAME";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index cdc7a2a..4dffc39 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.query;
 
 import static com.google.common.io.Closeables.closeQuietly;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
@@ -1481,8 +1482,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                 String newColumns =
                                         MIN_VALUE + " " + 
PDataType.LONG.getSqlTypeName() + ", " 
                                                 + MAX_VALUE + " " + 
PDataType.LONG.getSqlTypeName() + ", " 
-                                                + CYCLE_FLAG + " " + 
PDataType.BOOLEAN.getSqlTypeName();
-                                metaConnection = 
addColumnsIfNotExists(metaConnection, 
PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME,
+                                                + CYCLE_FLAG + " " + 
PDataType.BOOLEAN.getSqlTypeName() + ", " 
+                                                + LIMIT_REACHED_FLAG + " " + 
PDataType.BOOLEAN.getSqlTypeName();
+                                metaConnection = 
addColumnsIfNotExists(metaConnection, 
PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, 
                                     
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, newColumns); 
                             }
                         } catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 22bc271..223abb6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -360,18 +360,29 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
     public void incrementSequences(List<SequenceKey> sequenceKeys, long 
timestamp, long[] values,
             SQLException[] exceptions) throws SQLException {
         int i = 0;
-        for (SequenceKey key : sequenceKeys) {
-            SequenceInfo info = sequenceMap.get(key);
-            if (info == null) {
-                exceptions[i] =
-                        new SequenceNotFoundException(key.getSchemaName(), 
key.getSequenceName());
-            } else {
-                values[i] = info.sequenceValue;
-                info.sequenceValue =
-                        SequenceUtil.getNextValue(key, info);
-            }
-            i++;
-        }
+               for (SequenceKey key : sequenceKeys) {
+                       SequenceInfo info = sequenceMap.get(key);
+                       if (info == null) {
+                               exceptions[i] = new SequenceNotFoundException(
+                                               key.getSchemaName(), 
key.getSequenceName());
+                       } else {
+                               boolean increaseSeq = info.incrementBy > 0;
+                               if (info.limitReached) {
+                                       SQLExceptionCode code = increaseSeq ? 
SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE
+                                                       : 
SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE;
+                                       exceptions[i] = new 
SQLExceptionInfo.Builder(code).build().buildException();
+                               } else {
+                                       values[i] = info.sequenceValue;
+                                       info.sequenceValue += info.incrementBy 
* info.cacheSize;
+                                       info.limitReached = 
SequenceUtil.checkIfLimitReached(info);
+                                       if (info.limitReached && info.cycle) {
+                                               info.sequenceValue = 
increaseSeq ? info.minValue : info.maxValue;
+                                               info.limitReached = false;
+                                       }
+                               }
+                       }
+                       i++;
+               }
         i = 0;
         for (SQLException e : exceptions) {
             if (e != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index a27aae6..07f6612 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -42,6 +42,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
@@ -229,7 +230,8 @@ public interface QueryConstants {
             //  the following three columns were added in 3.1/4.1
             MIN_VALUE + " BIGINT, \n" + 
             MAX_VALUE + " BIGINT, \n" + 
-            CYCLE_FLAG + " BOOLEAN \n" + 
+            CYCLE_FLAG + " BOOLEAN, \n" + 
+            LIMIT_REACHED_FLAG + " BOOLEAN \n" + 
             " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + 
TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" + 
             HConstants.VERSIONS + "=" + 
MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + "\n";
        

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
index 7c47a5f..4dff12c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
@@ -22,10 +22,10 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH_BYTES;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -66,19 +66,19 @@ public class Sequence {
     private static final KeyValue CURRENT_VALUE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
CURRENT_VALUE_BYTES);
     private static final KeyValue INCREMENT_BY_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
INCREMENT_BY_BYTES);
     private static final KeyValue CACHE_SIZE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
CACHE_SIZE_BYTES);
-    private static final KeyValue START_WITH_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
START_WITH_BYTES);
     private static final KeyValue MIN_VALUE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
MIN_VALUE_BYTES);
     private static final KeyValue MAX_VALUE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
MAX_VALUE_BYTES);
     private static final KeyValue CYCLE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
CYCLE_FLAG_BYTES);
+    private static final KeyValue LIMIT_REACHED_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
LIMIT_REACHED_FLAG_BYTES);
     private static final List<KeyValue> SEQUENCE_KV_COLUMNS = 
Arrays.<KeyValue>asList(
             CURRENT_VALUE_KV,
             INCREMENT_BY_KV,
             CACHE_SIZE_KV,
-            START_WITH_KV,
-            // the following three columns were added in 3.1/4.1
+            // The following three columns were added in 3.1/4.1
             MIN_VALUE_KV,
             MAX_VALUE_KV,
-            CYCLE_KV
+            CYCLE_KV,
+            LIMIT_REACHED_KV
             );
     static {
         Collections.sort(SEQUENCE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -87,12 +87,12 @@ public class Sequence {
     private static final int CURRENT_VALUE_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(CURRENT_VALUE_KV);
     private static final int INCREMENT_BY_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(INCREMENT_BY_KV);
     private static final int CACHE_SIZE_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(CACHE_SIZE_KV);
-    private static final int START_WITH_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(START_WITH_KV);
     private static final int MIN_VALUE_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(MIN_VALUE_KV);
     private static final int MAX_VALUE_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(MAX_VALUE_KV);
     private static final int CYCLE_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(CYCLE_KV);
+    private static final int LIMIT_REACHED_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(LIMIT_REACHED_KV);
 
-    private static final int NUM_SEQUENCE_KEY_VALUES = 
SEQUENCE_KV_COLUMNS.size();
+    public static final int NUM_SEQUENCE_KEY_VALUES = 
SEQUENCE_KV_COLUMNS.size();
     private static final EmptySequenceCacheException 
EMPTY_SEQUENCE_CACHE_EXCEPTION = new EmptySequenceCacheException();
     
     private final SequenceKey key;
@@ -159,7 +159,6 @@ public class Sequence {
         
         long returnValue = value.currentValue;
         if (factor != 0) {
-            --value.unusedValues;
             boolean overflowOrUnderflow=false;
             // advance currentValue while checking for overflow
             try {
@@ -184,7 +183,7 @@ public class Sequence {
         if (value == null) {
             throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
         }
-        if (value.unusedValues == 0) {
+        if (value.currentValue == value.nextValue) {
             if (action == ValueOp.VALIDATE_SEQUENCE) {
                 return value.currentValue;
             }
@@ -199,7 +198,7 @@ public class Sequence {
         }
         List<Append> appends = 
Lists.newArrayListWithExpectedSize(values.size());
         for (SequenceValue value : values) {
-            if (value.isInitialized() && value.unusedValues>0) {
+            if (value.isInitialized() && value.currentValue != 
value.nextValue) {
                 appends.add(newReturn(value));
             }
         }
@@ -211,7 +210,7 @@ public class Sequence {
         if (value == null) {
             throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
         }
-        if (value.unusedValues==0) {
+        if (value.currentValue == value.nextValue) {
             throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
         }
         return newReturn(value);
@@ -222,11 +221,12 @@ public class Sequence {
         Append append = new Append(key);
         byte[] opBuf = new byte[] {(byte)MetaOp.RETURN_SEQUENCE.ordinal()};
         append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, opBuf);
-        append.setAttribute(SequenceRegionObserver.CURRENT_VALUE_ATTRIB, 
PDataType.LONG.toBytes(value.startValue));
+        append.setAttribute(SequenceRegionObserver.CURRENT_VALUE_ATTRIB, 
PDataType.LONG.toBytes(value.nextValue));
         Map<byte[], List<Cell>> familyMap = append.getFamilyCellMap();
         familyMap.put(PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
Arrays.<Cell>asList(
-                (Cell)KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, value.timestamp, 
ByteUtil.EMPTY_BYTE_ARRAY),
-                (Cell)KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.START_WITH_BYTES, value.timestamp, 
PDataType.LONG.toBytes(value.currentValue))
+                       (Cell)KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, value.timestamp, 
PDataType.LONG.toBytes(value.currentValue)),
+                       // set LIMIT_REACHED flag to false since we are 
returning unused sequence values
+                       (Cell)KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, value.timestamp, 
PDataType.FALSE_BYTES)
                 ));
         return append;
     }
@@ -300,12 +300,11 @@ public class Sequence {
      */
     private static KeyValue getKeyValue(Result r, KeyValue kv, int cellIndex) {
         Cell[] cells = r.rawCells();
-        // if the sequence row is from a previous version then MIN_VALUE, 
MAX_VALUE and CYCLE key values are not present,
-        // the sequence row has only four columns (START_VALUE, INCREMENT_BY, 
CACHE_SIZE and CURRENT_VALUE) and the order of the cells 
+        // if the sequence row is from a previous version then MIN_VALUE, 
MAX_VALUE, CYCLE and LIMIT_REACHED key values are not present,
+        // the sequence row has only three columns (INCREMENT_BY, CACHE_SIZE 
and CURRENT_VALUE) and the order of the cells 
         // in the array returned by rawCells() is not what what we expect so 
use getColumnLatestCell() to get the cell we want
-        Cell cell = cells.length != NUM_SEQUENCE_KEY_VALUES ?
-                r.getColumnLatestCell(kv.getFamilyArray(), 
kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), 
kv.getQualifierOffset(), kv.getQualifierLength()) 
-                : (cells[cellIndex]);
+        Cell cell = cells.length == NUM_SEQUENCE_KEY_VALUES ? cells[cellIndex] 
:
+               r.getColumnLatestCell(kv.getFamilyArray(), 
kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), 
kv.getQualifierOffset(), kv.getQualifierLength());
         return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cell);
     }
     
@@ -325,10 +324,6 @@ public class Sequence {
         return getKeyValue(r, CACHE_SIZE_KV, CACHE_SIZE_INDEX);
     }
     
-    public static KeyValue getStartValueKV(Result r) {
-        return getKeyValue(r, START_WITH_KV, START_WITH_INDEX);
-    }
-    
     public static KeyValue getMinValueKV(Result r) {
         return getKeyValue(r, MIN_VALUE_KV, MIN_VALUE_INDEX);
     }
@@ -340,6 +335,10 @@ public class Sequence {
     public static KeyValue getCycleKV(Result r) {
         return getKeyValue(r, CYCLE_KV, CYCLE_INDEX);
     }
+
+    public static KeyValue getLimitReachedKV(Result r) {
+        return getKeyValue(r, LIMIT_REACHED_KV, LIMIT_REACHED_INDEX);
+    }
     
     public static void replaceCurrentValueKV(List<Cell> kvs, KeyValue 
currentValueKV) {
         kvs.set(CURRENT_VALUE_INDEX, currentValueKV);
@@ -356,35 +355,36 @@ public class Sequence {
     public static void replaceCycleValueKV(List<Cell> kvs, KeyValue 
cycleValueKV) {
         kvs.set(CYCLE_INDEX, cycleValueKV);
     }
+    public static void replaceLimitReachedKV(List<Cell> kvs, KeyValue 
limitReachedKV) {
+        kvs.set(LIMIT_REACHED_INDEX, limitReachedKV);
+    }
     
     /**
-     * Returns a Cell[] for the result row. Handles empty MIN_VALUE, MAX_VALUE 
and CYCLE
-     * KeyValues if the sequence row is from a previous version 
+     * Returns the KeyValues of r if it contains the expected number of 
KeyValues,
+     * else returns a list of KeyValues corresponding to SEQUENCE_KV_COLUMNS 
      */
-    public static List<Cell> getCells(Result r) {
-        // if the sequence row is from a previous version 
-        if (r.rawCells().length == NUM_SEQUENCE_KEY_VALUES )
+    public static List<Cell> getCells(Result r, int numKVs) {
+        // if the sequence row is from a previous version
+        if (r.rawCells().length == numKVs )
             return Lists.newArrayList(r.rawCells());
-        // else we need to handle missing MIN_VALUE, MAX_VALUE and CYCLE 
KeyValues
+        // else we need to handle missing MIN_VALUE, MAX_VALUE, CYCLE and 
LIMIT_REACHED KeyValues
         List<Cell> cellList = 
Lists.newArrayListWithCapacity(NUM_SEQUENCE_KEY_VALUES);
         for (KeyValue kv : SEQUENCE_KV_COLUMNS) {
             cellList.add(getKeyValue(r,kv));
         }
         return cellList;
-    }
+    }    
     
     private static final class SequenceValue {
         public final long incrementBy;
         public final long timestamp;
+        public final long cacheSize;
         
         public long currentValue;
-        // start value of the current batch 
-        public long startValue;
+        public long nextValue;
         public long minValue;
         public long maxValue;
         public boolean cycle;
-        // number of values left in current batch
-        public long unusedValues;
         public boolean isDeleted;
         public boolean limitReached;
         
@@ -400,6 +400,7 @@ public class Sequence {
             this.isDeleted = isDeleted;
             this.incrementBy = 0;
             this.limitReached = false;
+            this.cacheSize = 0;
         }
         
         public boolean isInitialized() {
@@ -418,16 +419,14 @@ public class Sequence {
             KeyValue maxValueKV = getMaxValueKV(r);
             KeyValue cycleKV = getCycleKV(r);
             this.timestamp = currentValueKV.getTimestamp();
-            this.currentValue = 
PDataType.LONG.getCodec().decodeLong(currentValueKV.getValueArray(), 
currentValueKV.getValueOffset(), SortOrder.getDefault());
+            this.nextValue = 
PDataType.LONG.getCodec().decodeLong(currentValueKV.getValueArray(), 
currentValueKV.getValueOffset(), SortOrder.getDefault());
             this.incrementBy = 
PDataType.LONG.getCodec().decodeLong(incrementByKV.getValueArray(), 
incrementByKV.getValueOffset(), SortOrder.getDefault());
-            this.unusedValues = 
PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getValueArray(), 
cacheSizeKV.getValueOffset(), SortOrder.getDefault());
+            this.cacheSize = 
PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getValueArray(), 
cacheSizeKV.getValueOffset(), SortOrder.getDefault());
             this.minValue = 
PDataType.LONG.getCodec().decodeLong(minValueKV.getValueArray(), 
minValueKV.getValueOffset(), SortOrder.getDefault());
             this.maxValue = 
PDataType.LONG.getCodec().decodeLong(maxValueKV.getValueArray(), 
maxValueKV.getValueOffset(), SortOrder.getDefault());
             this.cycle = 
(Boolean)PDataType.BOOLEAN.toObject(cycleKV.getValueArray(), 
cycleKV.getValueOffset(), cycleKV.getValueLength());
             this.limitReached = false;
-            // store the start value of this batch of sequence values, so that 
it can be used to
-            // determine if we can return unused sequence values when we close 
the connection
-            this.startValue = this.currentValue;
+            currentValue = nextValue - incrementBy * cacheSize;
         }
     }
 
@@ -467,14 +466,15 @@ public class Sequence {
         Map<byte[], List<Cell>> familyMap = append.getFamilyCellMap();
         byte[] startWithBuf = PDataType.LONG.toBytes(startWith);
         familyMap.put(PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
Arrays.<Cell>asList(
-                (Cell)KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, timestamp, ByteUtil.EMPTY_BYTE_ARRAY),
-                (Cell)KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, timestamp, 
ByteUtil.EMPTY_BYTE_ARRAY),
-                (Cell)KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.START_WITH_BYTES, timestamp, startWithBuf),
-                (Cell)KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INCREMENT_BY_BYTES, timestamp, 
PDataType.LONG.toBytes(incrementBy)),
-                (Cell)KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CACHE_SIZE_BYTES, timestamp, 
PDataType.LONG.toBytes(cacheSize)),
-                (Cell)KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.MIN_VALUE_BYTES, timestamp, 
PDataType.LONG.toBytes(minValue)),
-                (Cell)KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.MAX_VALUE_BYTES, timestamp, 
PDataType.LONG.toBytes(maxValue)),
-                (Cell)KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, timestamp, 
PDataType.BOOLEAN.toBytes(cycle))
+                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, timestamp, ByteUtil.EMPTY_BYTE_ARRAY),
+                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, timestamp, startWithBuf),
+                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.START_WITH_BYTES, timestamp, startWithBuf),
+                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INCREMENT_BY_BYTES, timestamp, 
PDataType.LONG.toBytes(incrementBy)),
+                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CACHE_SIZE_BYTES, timestamp, 
PDataType.LONG.toBytes(cacheSize)),
+                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.MIN_VALUE_BYTES, timestamp, 
PDataType.LONG.toBytes(minValue)),
+                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.MAX_VALUE_BYTES, timestamp, 
PDataType.LONG.toBytes(maxValue)),
+                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, timestamp, 
PDataType.BOOLEAN.toBytes(cycle)),
+                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, timestamp, 
PDataType.FALSE_BYTES)
                 ));
         return append;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java
index 26ea132..be4455b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java
@@ -17,6 +17,7 @@ public class SequenceInfo {
     public final long maxValue;
     public final long cacheSize;
     public final boolean cycle;
+    public boolean limitReached;
 
     public SequenceInfo(long sequenceValue, long incrementBy, long minValue, 
long maxValue, long cacheSize, boolean cycle) {
         this.sequenceValue = sequenceValue;
@@ -25,5 +26,6 @@ public class SequenceInfo {
         this.maxValue = maxValue;
         this.cacheSize = cacheSize;
         this.cycle = cycle;
+        this.limitReached = false;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java
index b6d3c67..f29e448 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java
@@ -18,13 +18,13 @@
 package org.apache.phoenix.trace;
 
 import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
-import static org.apache.phoenix.metrics.MetricInfo.TAG;
 import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION;
 import static org.apache.phoenix.metrics.MetricInfo.END;
 import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
 import static org.apache.phoenix.metrics.MetricInfo.PARENT;
 import static org.apache.phoenix.metrics.MetricInfo.SPAN;
 import static org.apache.phoenix.metrics.MetricInfo.START;
+import static org.apache.phoenix.metrics.MetricInfo.TAG;
 import static org.apache.phoenix.metrics.MetricInfo.TRACE;
 
 import java.sql.Connection;
@@ -179,7 +179,7 @@ public class PhoenixTableMetricsWriter implements 
MetricsWriter {
         for (PhoenixAbstractMetric metric : record.metrics()) {
             // name of the metric is also the column name to which we write
             keys.add(MetricInfo.getColumnName(metric.getName()));
-            values.add((Long) metric.value());
+            values.add(metric.value());
         }
 
         // get the tags out so we can set them later (otherwise, need to be a 
single value)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
index b8dc32b..3850ca9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
@@ -77,17 +77,9 @@ public class KeyValueUtil {
                 value, valueOffset, valueLength);
     }
     
-    public static KeyValue newKeyValue(byte[] key, KeyValue kv, byte[] value) {
-        return newKeyValue(key, 0, key.length,
-            kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), 
-            kv.getQualifierArray(), kv.getQualifierOffset(), 
kv.getQualifierLength(), 
-            kv.getTimestamp(),
-            value, 0, value.length);
-    }
-
-    public static KeyValue newKeyValue(byte[] key, byte[] cf, byte[] cq, long 
ts, byte[] value) {
-        return newKeyValue(key,cf,cq,ts,value,0,value.length);
-    }
+       public static KeyValue newKeyValue(byte[] key, byte[] cf, byte[] cq, 
long ts, byte[] value) {
+               return newKeyValue(key, cf, cq, ts, value, 0, value.length);
+       }
 
     /**
      * Binary search for latest column value without allocating memory in the 
process

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
index f17d721..f97d565 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
@@ -15,7 +15,6 @@ import java.sql.SQLException;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.schema.SequenceInfo;
-import org.apache.phoenix.schema.SequenceKey;
 
 import com.google.common.math.LongMath;
 
@@ -28,36 +27,28 @@ public class SequenceUtil {
      * Returns the nextValue of a sequence 
      * @throws SQLException if cycle is false and the sequence limit has been 
reached
      */
-    public static long getNextValue(long currentValue, long minValue, long 
maxValue,
-            long incrementBy, long cacheSize, boolean cycle) throws 
SQLException {
+    public static boolean checkIfLimitReached(long currentValue, long 
minValue, long maxValue,
+            long incrementBy, long cacheSize) throws SQLException {
         long nextValue = 0;
         boolean increasingSeq = incrementBy > 0 ? true : false;
-        boolean overflowOrUnderflow = false;
         // advance currentValue while checking for overflow    
         try {
             long incrementValue = LongMath.checkedMultiply(incrementBy, 
cacheSize);
             nextValue = LongMath.checkedAdd(currentValue, incrementValue);
         } catch (ArithmeticException e) {
-            overflowOrUnderflow = true;
+            return true;
         }
 
-        // check if overflow or limit was reached
-        if (overflowOrUnderflow || (increasingSeq && nextValue > maxValue)
-                || (!increasingSeq && nextValue < minValue)) {
-            if (cycle) {
-                nextValue = increasingSeq ? minValue : maxValue;
-            } else {
-                SQLExceptionCode code =
-                        increasingSeq ? 
SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE
-                                : 
SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE;
-                throw new 
SQLExceptionInfo.Builder(code).build().buildException();
-            }
+        // check if limit was reached
+               if ((increasingSeq && nextValue > maxValue)
+                               || (!increasingSeq && nextValue < minValue)) {
+            return true;
         }
-        return nextValue;
+        return false;
     }
     
-    public static long getNextValue(SequenceKey key, SequenceInfo info) throws 
SQLException {
-        return getNextValue(info.sequenceValue, info.minValue, info.maxValue, 
info.incrementBy, info.cacheSize, info.cycle);
+    public static boolean checkIfLimitReached(SequenceInfo info) throws 
SQLException {
+        return checkIfLimitReached(info.sequenceValue, info.minValue, 
info.maxValue, info.incrementBy, info.cacheSize);
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aba5ea90/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
index bb1ef49..f25a213 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
@@ -10,11 +10,11 @@
  */
 package org.apache.phoenix.util;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.sql.SQLException;
 
-import org.apache.phoenix.exception.SQLExceptionCode;
 import org.junit.Test;
 
 public class SequenceUtilTest {
@@ -25,93 +25,41 @@ public class SequenceUtilTest {
 
     @Test
     public void testAscendingNextValueWithinLimit() throws SQLException {
-        assertEquals(9, SequenceUtil.getNextValue(5, MIN_VALUE, MAX_VALUE, 2/* 
incrementBy */,
-            CACHE_SIZE, false));
+        assertFalse(SequenceUtil.checkIfLimitReached(5, MIN_VALUE, MAX_VALUE, 
2/* incrementBy */, CACHE_SIZE));
     }
     
     @Test
     public void testAscendingNextValueReachLimit() throws SQLException {
-        assertEquals(MAX_VALUE, SequenceUtil.getNextValue(6, MIN_VALUE, 
MAX_VALUE, 2/* incrementBy */,
-            CACHE_SIZE, false));
+       assertFalse(SequenceUtil.checkIfLimitReached(6, MIN_VALUE, MAX_VALUE, 
2/* incrementBy */,  CACHE_SIZE));
     }
 
     @Test
-    public void testAscendingNextValueGreaterThanMaxValueNoCycle() throws 
SQLException {
-        try {
-            SequenceUtil.getNextValue(MAX_VALUE, MIN_VALUE, MAX_VALUE, 2/* 
incrementBy */, CACHE_SIZE,
-                false);
-        } catch (SQLException e) {
-            
assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(),
-                e.getErrorCode());
-        }
-    }
-
-    @Test
-    public void testAscendingNextValueGreaterThanMaxValueCycle() throws 
SQLException {
-        assertEquals(MIN_VALUE, SequenceUtil.getNextValue(MAX_VALUE, 
MIN_VALUE, MAX_VALUE,
-            2/* incrementBy */, CACHE_SIZE, true));
-    }
-    
-    @Test
-    public void testAscendingOverflowNoCycle() throws SQLException {
-        try {
-            SequenceUtil.getNextValue(Long.MAX_VALUE, 0, Long.MAX_VALUE, 1/* 
incrementBy */, CACHE_SIZE,
-                false);
-        } catch (SQLException e) {
-            
assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(),
-                e.getErrorCode());
-        }
+    public void testAscendingNextValueGreaterThanMaxValue() throws 
SQLException {
+        assertTrue(SequenceUtil.checkIfLimitReached(MAX_VALUE, MIN_VALUE, 
MAX_VALUE, 2/* incrementBy */, CACHE_SIZE));
     }
     
     @Test
-    public void testAscendingOverflowCycle() throws SQLException {
-        assertEquals(0, SequenceUtil.getNextValue(Long.MAX_VALUE, 0, 
Long.MAX_VALUE,
-            1/* incrementBy */, CACHE_SIZE, true));
+    public void testAscendingOverflow() throws SQLException {
+        assertTrue(SequenceUtil.checkIfLimitReached(Long.MAX_VALUE, 0, 
Long.MAX_VALUE, 1/* incrementBy */, CACHE_SIZE));
     }
 
     @Test
     public void testDescendingNextValueWithinLimit() throws SQLException {
-        assertEquals(2, SequenceUtil.getNextValue(6, MIN_VALUE, MAX_VALUE, 
-2/* incrementBy */,
-            CACHE_SIZE, false));
+       assertFalse(SequenceUtil.checkIfLimitReached(6, MIN_VALUE, MAX_VALUE, 
-2/* incrementBy */, CACHE_SIZE));
     }
     
     @Test
     public void testDescendingNextValueReachLimit() throws SQLException {
-        assertEquals(MIN_VALUE, SequenceUtil.getNextValue(5, MIN_VALUE, 
MAX_VALUE, -2/* incrementBy */,
-            CACHE_SIZE, false));
+       assertFalse(SequenceUtil.checkIfLimitReached(5, MIN_VALUE, MAX_VALUE, 
-2/* incrementBy */, CACHE_SIZE));
     }
 
     @Test
-    public void testDescendingNextValueLessThanMinValueNoCycle() throws 
SQLException {
-        try {
-            SequenceUtil.getNextValue(1, MIN_VALUE, MAX_VALUE, -2/* 
incrementBy */, CACHE_SIZE,
-                false);
-        } catch (SQLException e) {
-            
assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE.getErrorCode(),
-                e.getErrorCode());
-        }
-    }
-
-    @Test
-    public void testDescendingNextValueLessThanMinValueCycle() throws 
SQLException {
-        assertEquals(MAX_VALUE, SequenceUtil.getNextValue(2, MIN_VALUE, 
MAX_VALUE,
-            -2/* incrementBy */, CACHE_SIZE, true));
-    }
-    
-    @Test
-    public void testDescendingOverflowNoCycle() throws SQLException {
-        try {
-            SequenceUtil.getNextValue(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/* 
incrementBy */, CACHE_SIZE,
-                false);
-        } catch (SQLException e) {
-            
assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE.getErrorCode(),
-                e.getErrorCode());
-        }
+    public void testDescendingNextValueLessThanMinValue() throws SQLException {
+       assertTrue(SequenceUtil.checkIfLimitReached(2, MIN_VALUE, MAX_VALUE, 
-2/* incrementBy */, CACHE_SIZE));
     }
     
     @Test
     public void testDescendingOverflowCycle() throws SQLException {
-        assertEquals(0, SequenceUtil.getNextValue(Long.MIN_VALUE, 
Long.MIN_VALUE, 0,
-            -1/* incrementBy */, CACHE_SIZE, true));
+       assertTrue(SequenceUtil.checkIfLimitReached(Long.MIN_VALUE, 
Long.MIN_VALUE, 0, -1/* incrementBy */, CACHE_SIZE));
     }
 }

Reply via email to