Repository: phoenix
Updated Branches:
  refs/heads/3.0 ef27a9f56 -> 7cc2b5a54


PHOENIX-1016 Support MINVALUE, MAXVALUE, and CYCLE options in CREATE SEQUENCE 
(Thomas D'Silva)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7cc2b5a5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7cc2b5a5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7cc2b5a5

Branch: refs/heads/3.0
Commit: 7cc2b5a54e98b7db990c2b81bcc269c49509ce26
Parents: ef27a9f
Author: James Taylor <jtay...@salesforce.com>
Authored: Mon Jul 28 19:46:50 2014 -0700
Committer: James Taylor <jtay...@salesforce.com>
Committed: Mon Jul 28 19:46:50 2014 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/SequenceIT.java  |  19 +-
 .../phoenix/coprocessor/MetaDataProtocol.java   |   2 +-
 .../coprocessor/SequenceRegionObserver.java     | 223 +++++++++++--------
 ...SkipRangeParallelIteratorRegionSplitter.java |   8 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   4 +-
 .../query/ConnectionQueryServicesImpl.java      |   4 +-
 .../query/ConnectionlessQueryServicesImpl.java  |  38 +++-
 .../apache/phoenix/query/QueryConstants.java    |   4 +-
 .../org/apache/phoenix/schema/Sequence.java     |  85 ++++---
 .../org/apache/phoenix/schema/SequenceInfo.java |   2 +
 .../org/apache/phoenix/util/KeyValueUtil.java   |   8 -
 .../org/apache/phoenix/util/SequenceUtil.java   |  29 +--
 .../apache/phoenix/util/SequenceUtilTest.java   |  78 ++-----
 13 files changed, 262 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
index 0208d17..78f19a7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SequenceUtil;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -55,7 +56,7 @@ import com.google.common.collect.Maps;
 public class SequenceIT extends BaseClientManagedTimeIT {
     private static final String NEXT_VAL_SQL = "SELECT NEXT VALUE FOR foo.bar 
FROM SYSTEM.\"SEQUENCE\"";
     private static final long BATCH_SIZE = 3;
-    
+   
     private Connection conn;
     
     @BeforeClass
@@ -69,6 +70,13 @@ public class SequenceIT extends BaseClientManagedTimeIT {
         setUpTestDriver(getUrl(), new 
ReadOnlyProps(props.entrySet().iterator()));
     }
     
+    @After
+    public void tearDown() throws Exception {
+       // close any open connection between tests, so that connections are not 
leaked
+       if (conn != null) {
+               conn.close();
+       }
+    }
 
        @Test
        public void testSystemTable() throws Exception {                
@@ -114,7 +122,7 @@ public class SequenceIT extends BaseClientManagedTimeIT {
         assertTrue(rs.next());
         assertEquals("ALPHA", rs.getString("sequence_schema"));
         assertEquals("OMEGA", rs.getString("sequence_name"));
-        assertEquals(null, rs.getBytes("current_value"));
+        assertEquals(2, rs.getInt("current_value"));
         assertEquals(4, rs.getInt("increment_by"));
         assertFalse(rs.next());
        }
@@ -152,7 +160,7 @@ public class SequenceIT extends BaseClientManagedTimeIT {
         assertTrue(rs.next());
         assertEquals("ALPHA", rs.getString("sequence_schema"));
         assertEquals("OMEGA", rs.getString("sequence_name"));
-        assertEquals(null, rs.getBytes("current_value"));
+        assertEquals(2, rs.getInt("current_value"));
         assertEquals(4, rs.getInt("increment_by"));
         assertFalse(rs.next());
 
@@ -208,7 +216,7 @@ public class SequenceIT extends BaseClientManagedTimeIT {
                             "SELECT start_with, current_value, increment_by, 
cache_size, min_value, max_value, cycle_flag, sequence_schema, sequence_name 
FROM SYSTEM.\"SEQUENCE\"");
         assertTrue(rs.next());
         assertEquals(2, rs.getLong("start_with"));
-        assertEquals(null, rs.getBytes("current_value"));
+        assertEquals(2, rs.getInt("current_value"));
         assertEquals(3, rs.getLong("increment_by"));
         assertEquals(5, rs.getLong("cache_size"));
         assertEquals(0, rs.getLong("min_value"));
@@ -654,7 +662,7 @@ public class SequenceIT extends BaseClientManagedTimeIT {
         rs = conn.createStatement().executeQuery("SELECT sequence_name, 
current_value FROM SYSTEM.\"SEQUENCE\" WHERE sequence_name='BAR'");
         assertTrue(rs.next());
         assertEquals("BAR", rs.getString(1));
-        assertEquals(null, rs.getBytes(2));
+        assertEquals(1, rs.getInt(2));
         conn.close();
         conn2.close();
 
@@ -690,6 +698,7 @@ public class SequenceIT extends BaseClientManagedTimeIT {
         assertEquals(4, rs.getInt(1));
     }
     
+    // if nextConnection() is not used to get to get a connection, make sure 
you call .close() so that connections are not leaked
     private void nextConnection() throws Exception {
         if (conn != null) conn.close();
         long ts = nextTimestamp();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index cd4bd5e..a303b95 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -64,7 +64,7 @@ public interface MetaDataProtocol extends CoprocessorProtocol 
{
     public static final long MIN_TABLE_TIMESTAMP = 0;
     // Incremented with the addition of INDEX_TYPE to SYSTEM.CATALOG (though 
it's unused in 3.0)
     // plus the addition of MIN_VALUE, MAX_VALUE, and CYCLE to SYSTEM.SEQUENCE.
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP 
+ 2;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP 
+ 3;
     public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000;
 
     // TODO: pare this down to minimum, as we don't need duplicates for both 
table and column errors, nor should we need

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index cc3c0b4..944c4e6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
@@ -53,6 +54,10 @@ import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SequenceUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
 
 /**
  * 
@@ -82,6 +87,7 @@ public class SequenceRegionObserver extends 
BaseRegionObserver {
                         PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
                         QueryConstants.EMPTY_COLUMN_BYTES, timestamp, 
errorCodeBuf)));
     }
+    
     /**
      * Use PreIncrement hook of BaseRegionObserver to overcome deficiencies in 
Increment
      * implementation (HBASE-10254):
@@ -125,104 +131,113 @@ public class SequenceRegionObserver extends 
BaseRegionObserver {
                     return result;
                 }
                 
+                KeyValue currentValueKV = Sequence.getCurrentValueKV(result);
                 KeyValue incrementByKV = Sequence.getIncrementByKV(result);
-                KeyValue currentValueKV = Sequence.getCurrentValueKV(result);  
             
-                KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result);        
-                KeyValue cycleKV = Sequence.getCycleKV(result);
-                KeyValue minValueKV = Sequence.getMinValueKV(result);
-                KeyValue maxValueKV = Sequence.getMaxValueKV(result);
+                KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result);
+                
+                long currentValue = 
PDataType.LONG.getCodec().decodeLong(currentValueKV.getBuffer(), 
currentValueKV.getValueOffset(), SortOrder.getDefault());
+                long incrementBy = 
PDataType.LONG.getCodec().decodeLong(incrementByKV.getBuffer(), 
incrementByKV.getValueOffset(), SortOrder.getDefault());
+                long cacheSize = 
PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getBuffer(), 
cacheSizeKV.getValueOffset(), SortOrder.getDefault());
                 
                 // Hold timestamp constant for sequences, so that clients 
always only see the latest
                 // value regardless of when they connect.
-                Put put = new Put(row, currentValueKV.getTimestamp());
-                
-                // create a copy of the key values, used for the new Return
-                List<KeyValue> newkvs = Sequence.getCells(result);
+                long timestamp = currentValueKV.getTimestamp();
+                               Put put = new Put(row, timestamp);
                 
-                long incrementBy =
-                        
PDataType.LONG.getCodec().decodeLong(incrementByKV.getBuffer(),
-                            incrementByKV.getValueOffset(), 
SortOrder.getDefault());
-                     
-                long cacheSize =
-                        
PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getBuffer(),
-                            cacheSizeKV.getValueOffset(), 
SortOrder.getDefault());
+                               int numIncrementKVs = 
increment.getFamilyMap().get(PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES).size();
+                // creates the list of KeyValues used for the Result that will 
be returned
+                List<KeyValue> newkvs = Sequence.getKeyValueList(result, 
numIncrementKVs);
                 
-                // if the minValue, maxValue, or cycle is null this sequence 
has been upgraded from
-                // a lower version. Set minValue, maxValue and cycle to 
Long.MIN_VALUE, Long.MAX_VALUE and true 
-                // respectively in order to maintain existing behavior and 
also update the KeyValues on the server 
-                long minValue;
-                if (minValueKV == null) {
-                    minValue = Long.MIN_VALUE;
-                    // create new key value for put
-                    byte[] newMinValueBuffer = new 
byte[PDataType.LONG.getByteSize()];
-                    PDataType.LONG.getCodec().encodeLong(minValue, 
newMinValueBuffer, 0);
-                    KeyValue newMinValueKV = KeyValueUtil.newKeyValue(row, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES,
-                                PhoenixDatabaseMetaData.MIN_VALUE_BYTES, 
currentValueKV.getTimestamp(), newMinValueBuffer);
-                    put.add(newMinValueKV);
-                    // update key value in returned Result
-                    Sequence.replaceMinValueKV(newkvs, newMinValueKV);
-                }
-                else {
-                    minValue = 
PDataType.LONG.getCodec().decodeLong(minValueKV.getBuffer(),
-                                minValueKV.getValueOffset(), 
SortOrder.getDefault());
-                }           
-                long maxValue;
-                if (maxValueKV == null) {
-                    maxValue = Long.MAX_VALUE;
-                    // create new key value for put
-                    byte[] newMaxValueBuffer = new 
byte[PDataType.LONG.getByteSize()];
-                    PDataType.LONG.getCodec().encodeLong(maxValue, 
newMaxValueBuffer, 0);
-                    KeyValue newMaxValueKV = KeyValueUtil.newKeyValue(row, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES,
-                        PhoenixDatabaseMetaData.MAX_VALUE_BYTES, 
currentValueKV.getTimestamp(), newMaxValueBuffer);
-                    put.add(newMaxValueKV);
-                    // update key value in returned Result
-                    Sequence.replaceMaxValueKV(newkvs, newMaxValueKV);
-                }
-                else {
-                    maxValue =  
PDataType.LONG.getCodec().decodeLong(maxValueKV.getBuffer(),
-                            maxValueKV.getValueOffset(), 
SortOrder.getDefault());
-                }
-                boolean cycle;
-                if (cycleKV == null) {
-                    cycle = false;
-                    // create new key value for put
-                    KeyValue newCycleKV = KeyValueUtil.newKeyValue(row, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES,
-                        PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, 
currentValueKV.getTimestamp(), PDataType.FALSE_BYTES);
-                    put.add(newCycleKV);
-                    // update key value in returned Result
-                    Sequence.replaceCycleValueKV(newkvs, newCycleKV);
-                }
-                else {
-                    cycle = (Boolean) 
PDataType.BOOLEAN.toObject(cycleKV.getBuffer(),
-                            cycleKV.getValueOffset(), 
cycleKV.getValueLength());
-                }
-                long currentValue;
-                // initialize current value to start value
-                if (currentValueKV.getValueLength()==0) {
-                    KeyValue startValueKV = Sequence.getStartValueKV(result);
-                    currentValue =
-                            
PDataType.LONG.getCodec().decodeLong(startValueKV.getBuffer(),
-                                startValueKV.getValueOffset(), 
SortOrder.getDefault());
+                //if client is 3.0/4.0 preserve the old behavior (older 
clients won't have newer columns present in the increment)
+                if (numIncrementKVs != Sequence.NUM_SEQUENCE_KEY_VALUES) {
+                       currentValue += incrementBy * cacheSize;
+                    // Hold timestamp constant for sequences, so that clients 
always only see the latest value
+                    // regardless of when they connect.
+                    KeyValue newCurrentValueKV = createKeyValue(row, 
PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp);
+                    put.add(newCurrentValueKV);
+                    Sequence.replaceCurrentValueKV(newkvs, newCurrentValueKV);
                 }
                 else {
-                    currentValue =
-                            
PDataType.LONG.getCodec().decodeLong(currentValueKV.getBuffer(),
-                                currentValueKV.getValueOffset(), 
SortOrder.getDefault());      
-                    try {
-                        // set currentValue to nextValue
-                        currentValue =
-                                SequenceUtil.getNextValue(currentValue, 
minValue, maxValue,
-                                    incrementBy, cacheSize, cycle);
-                    } catch (SQLException sqlE) {
-                        return getErrorResult(row, maxTimestamp, 
sqlE.getErrorCode());
-                    }
+                       KeyValue cycleKV = Sequence.getCycleKV(result);
+                       KeyValue limitReachedKV = 
Sequence.getLimitReachedKV(result);
+                       KeyValue minValueKV = Sequence.getMinValueKV(result);
+                       KeyValue maxValueKV = Sequence.getMaxValueKV(result);
+                       
+                       boolean increasingSeq = incrementBy > 0 ? true : false;
+                       
+                       // if the minValue, maxValue, cycle and limitReached is 
null this sequence has been upgraded from
+                       // a lower version. Set minValue, maxValue, cycle and 
limitReached to Long.MIN_VALUE, Long.MAX_VALUE, true and false
+                       // respectively in order to maintain existing behavior 
and also update the KeyValues on the server 
+                       boolean limitReached;
+                       if (limitReachedKV == null) {
+                               limitReached = false;
+                               KeyValue newLimitReachedKV = 
createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, 
limitReached, timestamp);
+                               put.add(newLimitReachedKV);
+                               Sequence.replaceLimitReachedKV(newkvs, 
newLimitReachedKV);
+                       }
+                       else {
+                               limitReached = (Boolean) 
PDataType.BOOLEAN.toObject(limitReachedKV.getBuffer(),
+                                               
limitReachedKV.getValueOffset(), limitReachedKV.getValueLength());
+                       }
+                       long minValue;
+                       if (minValueKV == null) {
+                           minValue = Long.MIN_VALUE;
+                           KeyValue newMinValueKV = createKeyValue(row, 
PhoenixDatabaseMetaData.MIN_VALUE_BYTES, minValue, timestamp);
+                           put.add(newMinValueKV);
+                           Sequence.replaceMinValueKV(newkvs, newMinValueKV);
+                       }
+                       else {
+                           minValue = 
PDataType.LONG.getCodec().decodeLong(minValueKV.getBuffer(),
+                                       minValueKV.getValueOffset(), 
SortOrder.getDefault());
+                       }           
+                       long maxValue;
+                       if (maxValueKV == null) {
+                           maxValue = Long.MAX_VALUE;
+                           KeyValue newMaxValueKV = createKeyValue(row, 
PhoenixDatabaseMetaData.MAX_VALUE_BYTES, maxValue, timestamp);
+                           put.add(newMaxValueKV);
+                           Sequence.replaceMaxValueKV(newkvs, newMaxValueKV);
+                       }
+                       else {
+                           maxValue =  
PDataType.LONG.getCodec().decodeLong(maxValueKV.getBuffer(),
+                                   maxValueKV.getValueOffset(), 
SortOrder.getDefault());
+                       }
+                       boolean cycle;
+                       if (cycleKV == null) {
+                           cycle = false;
+                           KeyValue newCycleKV = createKeyValue(row, 
PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, cycle, timestamp);
+                           put.add(newCycleKV);
+                           Sequence.replaceCycleValueKV(newkvs, newCycleKV);
+                       }
+                       else {
+                           cycle = (Boolean) 
PDataType.BOOLEAN.toObject(cycleKV.getBuffer(),
+                                   cycleKV.getValueOffset(), 
cycleKV.getValueLength());
+                       }
+                       
+                       // return if we have run out of sequence values 
+                                       if (limitReached) {
+                                               if (cycle) {
+                                                       // reset currentValue 
of the Sequence row to minValue/maxValue
+                                                       currentValue = 
increasingSeq ? minValue : maxValue;
+                                               }
+                                               else {
+                                                       SQLExceptionCode code = 
increasingSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE
+                                                                       : 
SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE;
+                                                       return 
getErrorResult(row, maxTimestamp, code.getErrorCode());
+                                               }
+                                       }
+                       
+                       // check if the limit was reached
+                                       limitReached = 
SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, 
cacheSize);
+                       // update currentValue
+                                       currentValue += incrementBy * cacheSize;
+                                       // update the currentValue of the 
Result row
+                                       KeyValue newCurrentValueKV = 
createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, 
timestamp);
+                           Sequence.replaceCurrentValueKV(newkvs, 
newCurrentValueKV);
+                           put.add(newCurrentValueKV);
+                                       // set the LIMIT_REACHED column to 
true, so that no new values can be used
+                                       KeyValue newLimitReachedKV = 
createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, 
limitReached, timestamp);
+                           put.add(newLimitReachedKV);
                 }
-                byte[] newCurrentValueBuffer = new 
byte[PDataType.LONG.getByteSize()];
-                PDataType.LONG.getCodec().encodeLong(currentValue, 
newCurrentValueBuffer, 0);
-                KeyValue newCurrentValueKV = KeyValueUtil.newKeyValue(row, 
currentValueKV, newCurrentValueBuffer);
-                put.add(newCurrentValueKV);
-                Sequence.replaceCurrentValueKV(newkvs, newCurrentValueKV);
-                
                 // update the KeyValues on the server
                 @SuppressWarnings("unchecked")
                 Pair<Mutation,Integer>[] mutations = new Pair[1];
@@ -240,6 +255,36 @@ public class SequenceRegionObserver extends 
BaseRegionObserver {
             region.closeRegionOperation();
         }
     }
+    
+       /**
+        * Creates a new KeyValue for a long value
+        * 
+        * @param key
+        *            key used while creating KeyValue
+        * @param cqBytes
+        *            column qualifier of KeyValue
+        * @return return the KeyValue that was created
+        */
+       KeyValue createKeyValue(byte[] key, byte[] cqBytes, long value, long 
timestamp) {
+               byte[] valueBuffer = new byte[PDataType.LONG.getByteSize()];
+               PDataType.LONG.getCodec().encodeLong(value, valueBuffer, 0);
+               return KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, cqBytes, timestamp, valueBuffer);
+       }
+    
+       /**
+        * Creates a new KeyValue for a boolean value and adds it to the given 
put
+        * 
+        * @param key
+        *            key used while creating KeyValue
+        * @param cqBytes
+        *            column qualifier of KeyValue
+        * @return return the KeyValue that was created
+        */
+       private KeyValue createKeyValue(byte[] key, byte[] cqBytes, boolean 
value, long timestamp) throws IOException {
+               // create new key value for put
+               return KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, cqBytes, 
+                               timestamp, value ? PDataType.TRUE_BYTES : 
PDataType.FALSE_BYTES);
+       }
 
     /**
      * Override the preAppend for checkAndPut and checkAndDelete, as we need 
the ability to

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
index 8312fe7..3c3f933 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
@@ -21,10 +21,6 @@ import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.hadoop.hbase.HRegionLocation;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.parse.HintNode;
@@ -32,6 +28,10 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
 
 /**
  * Split the region according to the information contained in the scan's 
SkipScanFilter.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index de799ce..310ec03 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -199,7 +199,7 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData, org.apache.pho
     public static final byte[] CURRENT_VALUE_BYTES = 
Bytes.toBytes(CURRENT_VALUE);
     public static final String START_WITH = "START_WITH";
     public static final byte[] START_WITH_BYTES = Bytes.toBytes(START_WITH);
-    // MIN_VALUE, MAX_VALUE and CYCLE_FLAG were added in 3.0
+    // MIN_VALUE, MAX_VALUE, CYCLE_FLAG and LIMIT_FLAG were added in 3.1/4.1
     public static final String MIN_VALUE = "MIN_VALUE";
     public static final byte[] MIN_VALUE_BYTES = Bytes.toBytes(MIN_VALUE);
     public static final String MAX_VALUE = "MAX_VALUE";
@@ -210,6 +210,8 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData, org.apache.pho
     public static final byte[] CACHE_SIZE_BYTES = Bytes.toBytes(CACHE_SIZE);
     public static final String CYCLE_FLAG = "CYCLE_FLAG";
     public static final byte[] CYCLE_FLAG_BYTES = Bytes.toBytes(CYCLE_FLAG);
+    public static final String LIMIT_REACHED_FLAG = "LIMIT_REACHED_FLAG";
+    public static final byte[] LIMIT_REACHED_FLAG_BYTES = 
Bytes.toBytes(LIMIT_REACHED_FLAG);
     public static final String KEY_SEQ = "KEY_SEQ";
     public static final byte[] KEY_SEQ_BYTES = Bytes.toBytes(KEY_SEQ);
     public static final String SUPERTABLE_NAME = "SUPERTABLE_NAME";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index b13ced2..e6c99fe 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.query;
 
 import static com.google.common.io.Closeables.closeQuietly;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
@@ -1301,7 +1302,8 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                 String newColumns =
                                         MIN_VALUE + " " + 
PDataType.LONG.getSqlTypeName() + ", " 
                                                 + MAX_VALUE + " " + 
PDataType.LONG.getSqlTypeName() + ", " 
-                                                + CYCLE_FLAG + " " + 
PDataType.BOOLEAN.getSqlTypeName();
+                                                + CYCLE_FLAG + " " + 
PDataType.BOOLEAN.getSqlTypeName() + ", " 
+                                                + LIMIT_REACHED_FLAG + " " + 
PDataType.BOOLEAN.getSqlTypeName();
                                 metaConnection = 
addColumnsIfNotExists(metaConnection, 
PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, 
                                     
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, newColumns); 
                             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 1c1bba7..ee4c577 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -49,6 +50,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PMetaDataImpl;
@@ -63,6 +65,7 @@ import 
org.apache.phoenix.schema.SequenceAlreadyExistsException;
 import org.apache.phoenix.schema.SequenceInfo;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.SequenceNotFoundException;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
@@ -356,18 +359,29 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
     public void incrementSequences(List<SequenceKey> sequenceKeys, long 
timestamp, long[] values,
             SQLException[] exceptions) throws SQLException {
         int i = 0;
-        for (SequenceKey key : sequenceKeys) {
-            SequenceInfo info = sequenceMap.get(key);
-            if (info == null) {
-                exceptions[i] =
-                        new SequenceNotFoundException(key.getSchemaName(), 
key.getSequenceName());
-            } else {
-                values[i] = info.sequenceValue;
-                info.sequenceValue =
-                        SequenceUtil.getNextValue(key, info);
-            }
-            i++;
-        }
+               for (SequenceKey key : sequenceKeys) {
+                       SequenceInfo info = sequenceMap.get(key);
+                       if (info == null) {
+                               exceptions[i] = new SequenceNotFoundException(
+                                               key.getSchemaName(), 
key.getSequenceName());
+                       } else {
+                               boolean increaseSeq = info.incrementBy > 0;
+                               if (info.limitReached) {
+                                       SQLExceptionCode code = increaseSeq ? 
SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE
+                                                       : 
SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE;
+                                       exceptions[i] = new 
SQLExceptionInfo.Builder(code).build().buildException();
+                               } else {
+                                       values[i] = info.sequenceValue;
+                                       info.sequenceValue += info.incrementBy 
* info.cacheSize;
+                                       info.limitReached = 
SequenceUtil.checkIfLimitReached(info);
+                                       if (info.limitReached && info.cycle) {
+                                               info.sequenceValue = 
increaseSeq ? info.minValue : info.maxValue;
+                                               info.limitReached = false;
+                                       }
+                               }
+                       }
+                       i++;
+               }
         i = 0;
         for (SQLException e : exceptions) {
             if (e != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 20ab137..107b03f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -41,6 +41,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
@@ -227,7 +228,8 @@ public interface QueryConstants {
             //  the following three columns were added in 3.0/4.0
             MIN_VALUE + " BIGINT, \n" + 
             MAX_VALUE + " BIGINT, \n" + 
-            CYCLE_FLAG + " BOOLEAN \n" + 
+            CYCLE_FLAG + " BOOLEAN, \n" + 
+            LIMIT_REACHED_FLAG + " BOOLEAN \n" + 
             " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + 
TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" + 
             HConstants.VERSIONS + "=" + 
MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + "\n";
        

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
index eff39ab..e992126 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
@@ -22,10 +22,10 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH_BYTES;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -65,19 +65,19 @@ public class Sequence {
     private static final KeyValue CURRENT_VALUE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
CURRENT_VALUE_BYTES);
     private static final KeyValue INCREMENT_BY_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
INCREMENT_BY_BYTES);
     private static final KeyValue CACHE_SIZE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
CACHE_SIZE_BYTES);
-    private static final KeyValue START_WITH_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
START_WITH_BYTES);
     private static final KeyValue MIN_VALUE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
MIN_VALUE_BYTES);
     private static final KeyValue MAX_VALUE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
MAX_VALUE_BYTES);
     private static final KeyValue CYCLE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
CYCLE_FLAG_BYTES);
+    private static final KeyValue LIMIT_REACHED_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, 
LIMIT_REACHED_FLAG_BYTES);
     private static final List<KeyValue> SEQUENCE_KV_COLUMNS = 
Arrays.<KeyValue>asList(
             CURRENT_VALUE_KV,
             INCREMENT_BY_KV,
             CACHE_SIZE_KV,
-            START_WITH_KV,
-            // the following three columns were added in 3.0/4.0
+            // The following three columns were added in 3.1/4.1
             MIN_VALUE_KV,
             MAX_VALUE_KV,
-            CYCLE_KV
+            CYCLE_KV,
+            LIMIT_REACHED_KV
             );
     static {
         Collections.sort(SEQUENCE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -86,12 +86,12 @@ public class Sequence {
     private static final int CURRENT_VALUE_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(CURRENT_VALUE_KV);
     private static final int INCREMENT_BY_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(INCREMENT_BY_KV);
     private static final int CACHE_SIZE_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(CACHE_SIZE_KV);
-    private static final int START_WITH_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(START_WITH_KV);
     private static final int MIN_VALUE_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(MIN_VALUE_KV);
     private static final int MAX_VALUE_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(MAX_VALUE_KV);
     private static final int CYCLE_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(CYCLE_KV);
+    private static final int LIMIT_REACHED_INDEX = 
SEQUENCE_KV_COLUMNS.indexOf(LIMIT_REACHED_KV);
 
-    private static final int NUM_SEQUENCE_KEY_VALUES = 
SEQUENCE_KV_COLUMNS.size();
+    public static final int NUM_SEQUENCE_KEY_VALUES = 
SEQUENCE_KV_COLUMNS.size();
     private static final EmptySequenceCacheException 
EMPTY_SEQUENCE_CACHE_EXCEPTION = new EmptySequenceCacheException();
     
     private final SequenceKey key;
@@ -158,7 +158,6 @@ public class Sequence {
         
         long returnValue = value.currentValue;
         if (factor != 0) {
-            --value.unusedValues;
             boolean overflowOrUnderflow=false;
             // advance currentValue while checking for overflow
             try {
@@ -183,7 +182,7 @@ public class Sequence {
         if (value == null) {
             throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
         }
-        if (value.unusedValues == 0) {
+        if (value.currentValue == value.nextValue) {
             if (action == ValueOp.VALIDATE_SEQUENCE) {
                 return value.currentValue;
             }
@@ -198,7 +197,7 @@ public class Sequence {
         }
         List<Append> appends = 
Lists.newArrayListWithExpectedSize(values.size());
         for (SequenceValue value : values) {
-            if (value.isInitialized() && value.unusedValues>0) {
+            if (value.isInitialized() && value.currentValue != 
value.nextValue) {
                 appends.add(newReturn(value));
             }
         }
@@ -210,7 +209,7 @@ public class Sequence {
         if (value == null) {
             throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
         }
-        if (value.unusedValues==0) {
+        if (value.currentValue == value.nextValue) {
             throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
         }
         return newReturn(value);
@@ -221,11 +220,12 @@ public class Sequence {
         Append append = new Append(key);
         byte[] opBuf = new byte[] {(byte)MetaOp.RETURN_SEQUENCE.ordinal()};
         append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, opBuf);
-        append.setAttribute(SequenceRegionObserver.CURRENT_VALUE_ATTRIB, 
PDataType.LONG.toBytes(value.startValue));
+        append.setAttribute(SequenceRegionObserver.CURRENT_VALUE_ATTRIB, 
PDataType.LONG.toBytes(value.nextValue));
         Map<byte[], List<KeyValue>> familyMap = append.getFamilyMap();
         familyMap.put(PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
Arrays.<KeyValue>asList(
-                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, value.timestamp, 
ByteUtil.EMPTY_BYTE_ARRAY),
-                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.START_WITH_BYTES, value.timestamp, 
PDataType.LONG.toBytes(value.currentValue))
+                       KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, value.timestamp, 
PDataType.LONG.toBytes(value.currentValue)),
+                       // set LIMIT_REACHED flag to false since we are 
returning unused sequence values
+                       KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, value.timestamp, 
PDataType.FALSE_BYTES)
                 ));
         return append;
     }
@@ -299,9 +299,9 @@ public class Sequence {
     private static KeyValue getKeyValue(Result r, KeyValue kv, int cellIndex) {
         KeyValue[] kvs = r.raw();
         // if the sequence row is from a previous version then MIN_VALUE, 
MAX_VALUE and CYCLE key values are not present,
-        // the sequence row has only four columns (START_VALUE, INCREMENT_BY, 
CACHE_SIZE and CURRENT_VALUE) and the order of the cells 
+        // the sequence row has only three columns (INCREMENT_BY, CACHE_SIZE 
and CURRENT_VALUE) and the order of the cells 
         // in the array returned by rawCells() is not what what we expect so 
use getColumnLatestCell() to get the cell we want
-        return kvs.length != NUM_SEQUENCE_KEY_VALUES ?  
r.getColumnLatest(kv.getFamily(), kv.getQualifier()) : (kvs[cellIndex]);
+        return kvs.length == NUM_SEQUENCE_KEY_VALUES ? kvs[cellIndex] : 
r.getColumnLatest(kv.getFamily(), kv.getQualifier());
     }
     
     private static KeyValue getKeyValue(Result r, KeyValue kv) {
@@ -320,10 +320,6 @@ public class Sequence {
         return getKeyValue(r, CACHE_SIZE_KV, CACHE_SIZE_INDEX);
     }
     
-    public static KeyValue getStartValueKV(Result r) {
-        return getKeyValue(r, START_WITH_KV, START_WITH_INDEX);
-    }
-    
     public static KeyValue getMinValueKV(Result r) {
         return getKeyValue(r, MIN_VALUE_KV, MIN_VALUE_INDEX);
     }
@@ -336,6 +332,10 @@ public class Sequence {
         return getKeyValue(r, CYCLE_KV, CYCLE_INDEX);
     }
     
+    public static KeyValue getLimitReachedKV(Result r) {
+        return getKeyValue(r, LIMIT_REACHED_KV, LIMIT_REACHED_INDEX);
+    }
+    
     public static void replaceCurrentValueKV(List<KeyValue> kvs, KeyValue 
currentValueKV) {
         kvs.set(CURRENT_VALUE_INDEX, currentValueKV);
     }
@@ -351,16 +351,31 @@ public class Sequence {
     public static void replaceCycleValueKV(List<KeyValue> kvs, KeyValue 
cycleValueKV) {
         kvs.set(CYCLE_INDEX, cycleValueKV);
     }
+    public static void replaceLimitReachedKV(List<KeyValue> kvs, KeyValue 
limitReachedKV) {
+        kvs.set(LIMIT_REACHED_INDEX, limitReachedKV);
+    }
     
     /**
-     * Returns a KeyValue[] for the result row. Handles empty MIN_VALUE, 
MAX_VALUE and CYCLE
-     * KeyValues if the sequence row is from a previous version
+     * Returns the KeyValues of r if it contains the expected number of 
KeyValues,
+     * else returns a list of KeyValues corresponding to SEQUENCE_KV_COLUMNS 
      */
-    public static List<KeyValue> getCells(Result r) {
+    public static List<KeyValue> getKeyValueList(Result r, int numKVs) {
         // if the sequence row is from a previous version
-        if (r.raw().length == NUM_SEQUENCE_KEY_VALUES )
+        if (r.raw().length == numKVs )
             return Lists.newArrayList(r.raw());
-        // else we need to handle missing MIN_VALUE, MAX_VALUE and CYCLE 
KeyValues
+        // else we need to handle missing MIN_VALUE, MAX_VALUE, CYCLE and 
LIMIT_REACHED KeyValues
+        List<KeyValue> kvList = 
Lists.newArrayListWithCapacity(NUM_SEQUENCE_KEY_VALUES);
+        for (KeyValue kv : SEQUENCE_KV_COLUMNS) {
+            kvList.add(getKeyValue(r,kv));
+        }
+        return kvList;
+    }    
+    
+    /**
+     * Returns a list of KeyValues for the result row to be returned, adding 
only those KeyValues in r
+     * that are present in SEQUENCE_KV_COLUMNS
+     */
+    public static List<KeyValue> getCells(Result r) {
         List<KeyValue> kvList = 
Lists.newArrayListWithCapacity(NUM_SEQUENCE_KEY_VALUES);
         for (KeyValue kv : SEQUENCE_KV_COLUMNS) {
             kvList.add(getKeyValue(r,kv));
@@ -371,15 +386,13 @@ public class Sequence {
     private static final class SequenceValue {
         public final long incrementBy;
         public final long timestamp;
+        public final long cacheSize;
         
         public long currentValue;
-        // start value of the current batch 
-        public long startValue;
+        public long nextValue;
         public long minValue;
         public long maxValue;
         public boolean cycle;
-        // number of values left in current batch
-        public long unusedValues;
         public boolean isDeleted;
         public boolean limitReached;
         
@@ -395,6 +408,7 @@ public class Sequence {
             this.isDeleted = isDeleted;
             this.incrementBy = 0;
             this.limitReached = false;
+            this.cacheSize = 0;
         }
         
         public boolean isInitialized() {
@@ -413,16 +427,14 @@ public class Sequence {
             KeyValue maxValueKV = getMaxValueKV(r);
             KeyValue cycleKV = getCycleKV(r);
             this.timestamp = currentValueKV.getTimestamp();
-            this.currentValue = 
PDataType.LONG.getCodec().decodeLong(currentValueKV.getBuffer(), 
currentValueKV.getValueOffset(), SortOrder.getDefault());
+            this.nextValue = 
PDataType.LONG.getCodec().decodeLong(currentValueKV.getBuffer(), 
currentValueKV.getValueOffset(), SortOrder.getDefault());
             this.incrementBy = 
PDataType.LONG.getCodec().decodeLong(incrementByKV.getBuffer(), 
incrementByKV.getValueOffset(), SortOrder.getDefault());
-            this.unusedValues = 
PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getBuffer(), 
cacheSizeKV.getValueOffset(), SortOrder.getDefault());
+            this.cacheSize = 
PDataType.LONG.getCodec().decodeLong(cacheSizeKV.getBuffer(), 
cacheSizeKV.getValueOffset(), SortOrder.getDefault());
             this.minValue = 
PDataType.LONG.getCodec().decodeLong(minValueKV.getBuffer(), 
minValueKV.getValueOffset(), SortOrder.getDefault());
             this.maxValue = 
PDataType.LONG.getCodec().decodeLong(maxValueKV.getBuffer(), 
maxValueKV.getValueOffset(), SortOrder.getDefault());
             this.cycle = 
(Boolean)PDataType.BOOLEAN.toObject(cycleKV.getBuffer(), 
cycleKV.getValueOffset(), cycleKV.getValueLength());
             this.limitReached = false;
-            // store the start value of this batch of sequence values, so that 
it can be used to
-            // determine if we can return unused sequence values when we close 
the connection
-            this.startValue = this.currentValue;
+            currentValue = nextValue - incrementBy * cacheSize;
         }
     }
 
@@ -463,13 +475,14 @@ public class Sequence {
         byte[] startWithBuf = PDataType.LONG.toBytes(startWith);
         familyMap.put(PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
Arrays.<KeyValue>asList(
                 KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, timestamp, ByteUtil.EMPTY_BYTE_ARRAY),
-                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, timestamp, 
ByteUtil.EMPTY_BYTE_ARRAY),
+                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, timestamp, startWithBuf),
                 KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.START_WITH_BYTES, timestamp, startWithBuf),
                 KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INCREMENT_BY_BYTES, timestamp, 
PDataType.LONG.toBytes(incrementBy)),
                 KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CACHE_SIZE_BYTES, timestamp, 
PDataType.LONG.toBytes(cacheSize)),
                 KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.MIN_VALUE_BYTES, timestamp, 
PDataType.LONG.toBytes(minValue)),
                 KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.MAX_VALUE_BYTES, timestamp, 
PDataType.LONG.toBytes(maxValue)),
-                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, timestamp, 
PDataType.BOOLEAN.toBytes(cycle))
+                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, timestamp, 
PDataType.BOOLEAN.toBytes(cycle)),
+                KeyValueUtil.newKeyValue(key, 
PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, timestamp, 
PDataType.FALSE_BYTES)
                 ));
         return append;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java
index 26ea132..be4455b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceInfo.java
@@ -17,6 +17,7 @@ public class SequenceInfo {
     public final long maxValue;
     public final long cacheSize;
     public final boolean cycle;
+    public boolean limitReached;
 
     public SequenceInfo(long sequenceValue, long incrementBy, long minValue, 
long maxValue, long cacheSize, boolean cycle) {
         this.sequenceValue = sequenceValue;
@@ -25,5 +26,6 @@ public class SequenceInfo {
         this.maxValue = maxValue;
         this.cacheSize = cacheSize;
         this.cycle = cycle;
+        this.limitReached = false;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
index 2f10c15..5102faa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
@@ -76,14 +76,6 @@ public class KeyValueUtil {
                 value, valueOffset, valueLength);
     }
     
-    public static KeyValue newKeyValue(byte[] key, KeyValue kv, byte[] value) {
-        return newKeyValue(key, 0, key.length,
-            kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(), 
-            kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(), 
-            kv.getTimestamp(),
-            value, 0, value.length);
-    }
-
     public static KeyValue newKeyValue(byte[] key, byte[] cf, byte[] cq, long 
ts, byte[] value) {
         return newKeyValue(key,cf,cq,ts,value,0,value.length);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
index f17d721..f97d565 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
@@ -15,7 +15,6 @@ import java.sql.SQLException;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.schema.SequenceInfo;
-import org.apache.phoenix.schema.SequenceKey;
 
 import com.google.common.math.LongMath;
 
@@ -28,36 +27,28 @@ public class SequenceUtil {
      * Returns the nextValue of a sequence 
      * @throws SQLException if cycle is false and the sequence limit has been 
reached
      */
-    public static long getNextValue(long currentValue, long minValue, long 
maxValue,
-            long incrementBy, long cacheSize, boolean cycle) throws 
SQLException {
+    public static boolean checkIfLimitReached(long currentValue, long 
minValue, long maxValue,
+            long incrementBy, long cacheSize) throws SQLException {
         long nextValue = 0;
         boolean increasingSeq = incrementBy > 0 ? true : false;
-        boolean overflowOrUnderflow = false;
         // advance currentValue while checking for overflow    
         try {
             long incrementValue = LongMath.checkedMultiply(incrementBy, 
cacheSize);
             nextValue = LongMath.checkedAdd(currentValue, incrementValue);
         } catch (ArithmeticException e) {
-            overflowOrUnderflow = true;
+            return true;
         }
 
-        // check if overflow or limit was reached
-        if (overflowOrUnderflow || (increasingSeq && nextValue > maxValue)
-                || (!increasingSeq && nextValue < minValue)) {
-            if (cycle) {
-                nextValue = increasingSeq ? minValue : maxValue;
-            } else {
-                SQLExceptionCode code =
-                        increasingSeq ? 
SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE
-                                : 
SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE;
-                throw new 
SQLExceptionInfo.Builder(code).build().buildException();
-            }
+        // check if limit was reached
+               if ((increasingSeq && nextValue > maxValue)
+                               || (!increasingSeq && nextValue < minValue)) {
+            return true;
         }
-        return nextValue;
+        return false;
     }
     
-    public static long getNextValue(SequenceKey key, SequenceInfo info) throws 
SQLException {
-        return getNextValue(info.sequenceValue, info.minValue, info.maxValue, 
info.incrementBy, info.cacheSize, info.cycle);
+    public static boolean checkIfLimitReached(SequenceInfo info) throws 
SQLException {
+        return checkIfLimitReached(info.sequenceValue, info.minValue, 
info.maxValue, info.incrementBy, info.cacheSize);
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7cc2b5a5/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
index bb1ef49..f25a213 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
@@ -10,11 +10,11 @@
  */
 package org.apache.phoenix.util;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.sql.SQLException;
 
-import org.apache.phoenix.exception.SQLExceptionCode;
 import org.junit.Test;
 
 public class SequenceUtilTest {
@@ -25,93 +25,41 @@ public class SequenceUtilTest {
 
     @Test
     public void testAscendingNextValueWithinLimit() throws SQLException {
-        assertEquals(9, SequenceUtil.getNextValue(5, MIN_VALUE, MAX_VALUE, 2/* 
incrementBy */,
-            CACHE_SIZE, false));
+        assertFalse(SequenceUtil.checkIfLimitReached(5, MIN_VALUE, MAX_VALUE, 
2/* incrementBy */, CACHE_SIZE));
     }
     
     @Test
     public void testAscendingNextValueReachLimit() throws SQLException {
-        assertEquals(MAX_VALUE, SequenceUtil.getNextValue(6, MIN_VALUE, 
MAX_VALUE, 2/* incrementBy */,
-            CACHE_SIZE, false));
+       assertFalse(SequenceUtil.checkIfLimitReached(6, MIN_VALUE, MAX_VALUE, 
2/* incrementBy */,  CACHE_SIZE));
     }
 
     @Test
-    public void testAscendingNextValueGreaterThanMaxValueNoCycle() throws 
SQLException {
-        try {
-            SequenceUtil.getNextValue(MAX_VALUE, MIN_VALUE, MAX_VALUE, 2/* 
incrementBy */, CACHE_SIZE,
-                false);
-        } catch (SQLException e) {
-            
assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(),
-                e.getErrorCode());
-        }
-    }
-
-    @Test
-    public void testAscendingNextValueGreaterThanMaxValueCycle() throws 
SQLException {
-        assertEquals(MIN_VALUE, SequenceUtil.getNextValue(MAX_VALUE, 
MIN_VALUE, MAX_VALUE,
-            2/* incrementBy */, CACHE_SIZE, true));
-    }
-    
-    @Test
-    public void testAscendingOverflowNoCycle() throws SQLException {
-        try {
-            SequenceUtil.getNextValue(Long.MAX_VALUE, 0, Long.MAX_VALUE, 1/* 
incrementBy */, CACHE_SIZE,
-                false);
-        } catch (SQLException e) {
-            
assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(),
-                e.getErrorCode());
-        }
+    public void testAscendingNextValueGreaterThanMaxValue() throws 
SQLException {
+        assertTrue(SequenceUtil.checkIfLimitReached(MAX_VALUE, MIN_VALUE, 
MAX_VALUE, 2/* incrementBy */, CACHE_SIZE));
     }
     
     @Test
-    public void testAscendingOverflowCycle() throws SQLException {
-        assertEquals(0, SequenceUtil.getNextValue(Long.MAX_VALUE, 0, 
Long.MAX_VALUE,
-            1/* incrementBy */, CACHE_SIZE, true));
+    public void testAscendingOverflow() throws SQLException {
+        assertTrue(SequenceUtil.checkIfLimitReached(Long.MAX_VALUE, 0, 
Long.MAX_VALUE, 1/* incrementBy */, CACHE_SIZE));
     }
 
     @Test
     public void testDescendingNextValueWithinLimit() throws SQLException {
-        assertEquals(2, SequenceUtil.getNextValue(6, MIN_VALUE, MAX_VALUE, 
-2/* incrementBy */,
-            CACHE_SIZE, false));
+       assertFalse(SequenceUtil.checkIfLimitReached(6, MIN_VALUE, MAX_VALUE, 
-2/* incrementBy */, CACHE_SIZE));
     }
     
     @Test
     public void testDescendingNextValueReachLimit() throws SQLException {
-        assertEquals(MIN_VALUE, SequenceUtil.getNextValue(5, MIN_VALUE, 
MAX_VALUE, -2/* incrementBy */,
-            CACHE_SIZE, false));
+       assertFalse(SequenceUtil.checkIfLimitReached(5, MIN_VALUE, MAX_VALUE, 
-2/* incrementBy */, CACHE_SIZE));
     }
 
     @Test
-    public void testDescendingNextValueLessThanMinValueNoCycle() throws 
SQLException {
-        try {
-            SequenceUtil.getNextValue(1, MIN_VALUE, MAX_VALUE, -2/* 
incrementBy */, CACHE_SIZE,
-                false);
-        } catch (SQLException e) {
-            
assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE.getErrorCode(),
-                e.getErrorCode());
-        }
-    }
-
-    @Test
-    public void testDescendingNextValueLessThanMinValueCycle() throws 
SQLException {
-        assertEquals(MAX_VALUE, SequenceUtil.getNextValue(2, MIN_VALUE, 
MAX_VALUE,
-            -2/* incrementBy */, CACHE_SIZE, true));
-    }
-    
-    @Test
-    public void testDescendingOverflowNoCycle() throws SQLException {
-        try {
-            SequenceUtil.getNextValue(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/* 
incrementBy */, CACHE_SIZE,
-                false);
-        } catch (SQLException e) {
-            
assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE.getErrorCode(),
-                e.getErrorCode());
-        }
+    public void testDescendingNextValueLessThanMinValue() throws SQLException {
+       assertTrue(SequenceUtil.checkIfLimitReached(2, MIN_VALUE, MAX_VALUE, 
-2/* incrementBy */, CACHE_SIZE));
     }
     
     @Test
     public void testDescendingOverflowCycle() throws SQLException {
-        assertEquals(0, SequenceUtil.getNextValue(Long.MIN_VALUE, 
Long.MIN_VALUE, 0,
-            -1/* incrementBy */, CACHE_SIZE, true));
+       assertTrue(SequenceUtil.checkIfLimitReached(Long.MIN_VALUE, 
Long.MIN_VALUE, 0, -1/* incrementBy */, CACHE_SIZE));
     }
 }

Reply via email to