Repository: phoenix
Updated Branches:
  refs/heads/4.0 4cd5826e7 -> 0944386f8


PHOENIX-1357 Salt sequence table to prevent same RS from getting all sequence 
calls

Conflicts:
        
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
        
phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7e063d63
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7e063d63
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7e063d63

Branch: refs/heads/4.0
Commit: 7e063d63d7cb29cb01e790f49d1ff42ddb042980
Parents: 4cd5826
Author: James Taylor <[email protected]>
Authored: Wed Oct 15 22:12:15 2014 -0700
Committer: James Taylor <[email protected]>
Committed: Fri Oct 17 01:07:12 2014 -0700

----------------------------------------------------------------------
 .../phoenix/coprocessor/MetaDataProtocol.java   |   6 +-
 .../query/ConnectionQueryServicesImpl.java      |  22 +--
 .../apache/phoenix/query/QueryConstants.java    |   4 +-
 .../org/apache/phoenix/schema/Sequence.java     |   9 +-
 .../org/apache/phoenix/schema/SequenceKey.java  |  49 +++---
 .../org/apache/phoenix/util/SchemaUtil.java     |   8 -
 .../org/apache/phoenix/util/UpgradeUtil.java    | 167 +++++++++++++++++++
 7 files changed, 215 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e063d63/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 8e61f1b..ac1287a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -58,9 +58,9 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
             VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, 
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
     
     public static final long MIN_TABLE_TIMESTAMP = 0;
-    // Each time a column is added to the SYSTEM.CATALOG, this should be 
increased.
-    // Adding INDEX_TYPE column for local indexing
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP 
+ 3;
+
+    // Incremented from 3 to 4 to salt the sequence table in 3.2/4.2
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP 
+ 4;
     public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000;
 
     // TODO: pare this down to minimum, as we don't need duplicates for both 
table and column errors, nor should we need

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e063d63/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 977a7e7..7b1278a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -134,6 +134,7 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.UpgradeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1519,13 +1520,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                 // Ignore, as this will happen if the 
SYSTEM.CATALOG already exists at this fixed timestamp.
                                 // A TableAlreadyExistsException is not 
thrown, since the table only exists *after* this fixed timestamp.
                             } catch (TableAlreadyExistsException ignore) {
-                                // This will occur if we have an older 
SYSTEM.CATALOG and we need to update it to include
-                                // any new columns we've added.
-                                metaConnection = 
addColumnsIfNotExists(metaConnection, 
-                                  PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
-                                  MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, 
-                                  PhoenixDatabaseMetaData.INDEX_TYPE + " " + 
PDataType.UNSIGNED_TINYINT.getSqlTypeName() + 
-                                  ", " + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + 
PDataType.LONG.getSqlTypeName());
                             }
                             try {
                                 
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
@@ -1536,13 +1530,13 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                             } catch (TableAlreadyExistsException ignore) {
                                 // This will occur if we have an older 
SYSTEM.SEQUENCE, so we need to update it to include
                                 // any new columns we've added.
-                                String newColumns = 
PhoenixDatabaseMetaData.MIN_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", "
-                                        + PhoenixDatabaseMetaData.MAX_VALUE + 
" " + PDataType.LONG.getSqlTypeName() + ", " + 
PhoenixDatabaseMetaData.CYCLE_FLAG + " "
-                                        + PDataType.BOOLEAN.getSqlTypeName() + 
", " + PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG + " "
-                                        + PDataType.BOOLEAN.getSqlTypeName();
-                                metaConnection = 
addColumnsIfNotExists(metaConnection,
-                                        
PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME,
-                                        
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, newColumns);
+                                if 
(UpgradeUtil.addSaltByteToSequenceTable(metaConnection)) {
+                                    metaConnection.removeTable(null,
+                                            
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+                                            
PhoenixDatabaseMetaData.TYPE_SEQUENCE,
+                                            
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
+                                    clearCache();
+                                }
                             }
                             try {
                                 metaConnection.createStatement().executeUpdate(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e063d63/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 4ad3159..1fd7b15 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -99,6 +99,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.util.ByteUtil;
 
 
@@ -266,6 +267,7 @@ public interface QueryConstants {
             CYCLE_FLAG + " BOOLEAN, \n" + 
             LIMIT_REACHED_FLAG + " BOOLEAN \n" + 
             " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + 
TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" + 
-            HConstants.VERSIONS + "=" + 
MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + "\n";
+            HConstants.VERSIONS + "=" + 
MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
+            "SALT_BUCKETS=" + SaltingUtil.MAX_BUCKET_NUM + "\n";
        
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e063d63/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
index 4dff12c..ae822e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
@@ -50,7 +50,6 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.KeyValueUtil;
-import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SequenceUtil;
 
 import com.google.common.collect.Lists;
@@ -217,7 +216,7 @@ public class Sequence {
     }
 
     private Append newReturn(SequenceValue value) {
-        byte[] key = SchemaUtil.getSequenceKey(this.key.getTenantId(), 
this.key.getSchemaName(), this.key.getSequenceName());
+        byte[] key = this.key.getKey();
         Append append = new Append(key);
         byte[] opBuf = new byte[] {(byte)MetaOp.RETURN_SEQUENCE.ordinal()};
         append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, opBuf);
@@ -275,7 +274,7 @@ public class Sequence {
 
     @SuppressWarnings("deprecation")
     public Increment newIncrement(long timestamp, Sequence.ValueOp action) {
-        Increment inc = new 
Increment(SchemaUtil.getSequenceKey(key.getTenantId(), key.getSchemaName(), 
key.getSequenceName()));
+        Increment inc = new Increment(key.getKey());
         // It doesn't matter what we set the amount too - we always use the 
values we get
         // from the Get we do to prevent any race conditions. All columns that 
get added
         // are returned with their current value
@@ -457,7 +456,7 @@ public class Sequence {
     }
 
     public Append createSequence(long startWith, long incrementBy, long 
cacheSize, long timestamp, long minValue, long maxValue, boolean cycle) {
-        byte[] key = SchemaUtil.getSequenceKey(this.key.getTenantId(), 
this.key.getSchemaName(), this.key.getSequenceName());
+        byte[] key = this.key.getKey();
         Append append = new Append(key);
         append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, new 
byte[] {(byte)MetaOp.CREATE_SEQUENCE.ordinal()});
         if (timestamp != HConstants.LATEST_TIMESTAMP) {
@@ -496,7 +495,7 @@ public class Sequence {
     }
 
     public Append dropSequence(long timestamp) {
-        byte[] key = SchemaUtil.getSequenceKey(this.key.getTenantId(), 
this.key.getSchemaName(), this.key.getSequenceName());
+        byte[] key =  this.key.getKey();
         Append append = new Append(key);
         append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, new 
byte[] {(byte)MetaOp.DROP_SEQUENCE.ordinal()});
         if (timestamp != HConstants.LATEST_TIMESTAMP) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e063d63/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
index 644fc4a..c25e438 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
@@ -17,37 +17,29 @@
  */
 package org.apache.phoenix.schema;
 
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.ByteUtil;
 
-public class SequenceKey implements Comparable<SequenceKey> {
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((tenantId == null) ? 0 : 
tenantId.hashCode());
-        result = prime * result + ((schemaName == null) ? 0 : 
schemaName.hashCode());
-        result = prime * result + sequenceName.hashCode();
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) return true;
-        if (obj == null) return false;
-        if (getClass() != obj.getClass()) return false;
-        SequenceKey other = (SequenceKey)obj;
-        return this.compareTo(other) == 0;
-    }
 
+public class SequenceKey implements Comparable<SequenceKey> {
     private final String tenantId;
     private final String schemaName;
     private final String sequenceName;
+    private final byte[] key;
     
     public SequenceKey(String tenantId, String schemaName, String 
sequenceName) {
         this.tenantId = tenantId;
         this.schemaName = schemaName;
         this.sequenceName = sequenceName;
+        this.key = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, 
tenantId == null  ? ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(tenantId), 
QueryConstants.SEPARATOR_BYTE_ARRAY, schemaName == null ? 
ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(schemaName), 
QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(sequenceName));
+        key[0] = SaltingUtil.getSaltingByte(key, 
SaltingUtil.NUM_SALTING_BYTES, key.length - SaltingUtil.NUM_SALTING_BYTES, 
SaltingUtil.MAX_BUCKET_NUM);
     }
 
+    public byte[] getKey() {
+        return key;
+
+    }
     public String getTenantId() {
         return tenantId;
     }
@@ -71,4 +63,23 @@ public class SequenceKey implements Comparable<SequenceKey> {
         }
         return c;
     }
+    
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((tenantId == null) ? 0 : 
tenantId.hashCode());
+        result = prime * result + ((schemaName == null) ? 0 : 
schemaName.hashCode());
+        result = prime * result + sequenceName.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        SequenceKey other = (SequenceKey)obj;
+        return this.compareTo(other) == 0;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e063d63/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 5cc861b..30c328b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -161,14 +161,6 @@ public class SchemaUtil {
         return l3;
     }
 
-    public static byte[] getSequenceKey(byte[] tenantId, byte[] schemaName, 
byte[] sequenceName) {
-        return getTableKey(tenantId, schemaName, sequenceName);
-    }
-
-    public static byte[] getSequenceKey(String tenantId, String schemaName, 
String sequenceName) {
-        return getTableKey(tenantId, schemaName, sequenceName);
-    }
-
     /**
      * Get the key used in the Phoenix metadata row for a table definition
      * @param schemaName

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e063d63/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
new file mode 100644
index 0000000..4c8a369
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class UpgradeUtil {
+    private static final Logger logger = 
LoggerFactory.getLogger(UpgradeUtil.class);
+
+    private UpgradeUtil() {
+    }
+
+    public static boolean addSaltByteToSequenceTable(PhoenixConnection conn) 
throws SQLException {
+        logger.info("Upgrading SYSTEM.SEQUENCE table");
+
+        HTableInterface sysTable = 
conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+        try {
+            byte[] seqTableKey = SchemaUtil.getTableKey(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, 
PhoenixDatabaseMetaData.TYPE_SEQUENCE);
+            logger.info("Setting SALT_BUCKETS property of SYSTEM.SEQUENCE to " 
+ SaltingUtil.MAX_BUCKET_NUM);
+            KeyValue saltKV = KeyValueUtil.newKeyValue(seqTableKey, 
+                    PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES,
+                    MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
+                    PDataType.INTEGER.toBytes(SaltingUtil.MAX_BUCKET_NUM));
+            Put put = new Put(seqTableKey);
+            put.add(saltKV);
+            // Prevent multiple clients from doing this upgrade
+            if (!sysTable.checkAndPut(seqTableKey,
+                    PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, null, put)) {
+
+                logger.info("SYSTEM.SEQUENCE table has already been upgraded");
+                return false;
+            }
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
+        } finally {
+            try {
+                sysTable.close();
+            } catch (IOException e) {
+                logger.warn("Exception during close",e);
+            }
+        }
+        int batchSizeBytes = 100 * 1024; // 100K chunks
+        int sizeBytes = 0;
+        List<Mutation> mutations =  Lists.newArrayListWithExpectedSize(10000);
+
+        boolean success = false;
+        Scan scan = new Scan();
+        scan.setRaw(true);
+        scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS);
+        HTableInterface seqTable = 
conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
+        try {
+            logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
+            ResultScanner scanner = seqTable.getScanner(scan);
+            try {
+                Result result;
+                while ((result = scanner.next()) != null) {
+                    for (KeyValue keyValue : result.raw()) {
+                        KeyValue newKeyValue = addSaltByte(keyValue);
+                        sizeBytes += newKeyValue.getLength();
+                        if (KeyValue.Type.codeToType(newKeyValue.getType()) == 
KeyValue.Type.Put) {
+                            // Delete old value
+                            byte[] buf = keyValue.getBuffer();
+                            Delete delete = new Delete(keyValue.getRow());
+                            KeyValue deleteKeyValue = new KeyValue(buf, 
keyValue.getRowOffset(), keyValue.getRowLength(),
+                                    buf, keyValue.getFamilyOffset(), 
keyValue.getFamilyLength(),
+                                    buf, keyValue.getQualifierOffset(), 
keyValue.getQualifierLength(),
+                                    keyValue.getTimestamp(), 
KeyValue.Type.Delete,
+                                    ByteUtil.EMPTY_BYTE_ARRAY,0,0);
+                            delete.addDeleteMarker(deleteKeyValue);
+                            mutations.add(delete);
+                            sizeBytes += deleteKeyValue.getLength();
+                            // Put new value
+                            Put put = new Put(newKeyValue.getRow());
+                            put.add(newKeyValue);
+                            mutations.add(put);
+                        } else if 
(KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Delete){
+                            // Copy delete marker using new key so that it 
continues
+                            // to delete the key value preceding it that will 
be updated
+                            // as well.
+                            Delete delete = new Delete(newKeyValue.getRow());
+                            delete.addDeleteMarker(newKeyValue);
+                            mutations.add(delete);
+                        }
+                        if (sizeBytes >= batchSizeBytes) {
+                            logger.info("Committing bactch of SYSTEM.SEQUENCE 
rows");
+                            seqTable.batch(mutations);
+                            mutations.clear();
+                            sizeBytes = 0;
+                        }
+                    }
+                }
+                if (!mutations.isEmpty()) {
+                    logger.info("Committing last bactch of SYSTEM.SEQUENCE 
rows");
+                    seqTable.batch(mutations);
+                }
+                logger.info("Successfully completed upgrade of 
SYSTEM.SEQUENCE");
+                success = true;
+                return true;
+            } catch (InterruptedException e) {
+                throw ServerUtil.parseServerException(e);
+            } finally {
+                if (!success) logger.error("SYSTEM.SEQUENCE TABLE LEFT IN 
CORRUPT STATE");
+                scanner.close();
+            }
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
+        } finally {
+            try {
+                seqTable.close();
+            } catch (IOException e) {
+                logger.warn("Exception during close",e);
+            }
+        }
+    }
+    
+    private static KeyValue addSaltByte(KeyValue keyValue) {
+        int length = keyValue.getRowLength();
+        int offset = keyValue.getRowOffset();
+        byte[] buf = keyValue.getBuffer();
+        byte[] newBuf = new byte[length + 1];
+        System.arraycopy(buf, offset, newBuf, SaltingUtil.NUM_SALTING_BYTES, 
length);
+        newBuf[0] = SaltingUtil.getSaltingByte(newBuf, 
SaltingUtil.NUM_SALTING_BYTES, length, SaltingUtil.MAX_BUCKET_NUM);
+        return new KeyValue(newBuf, 0, newBuf.length,
+                buf, keyValue.getFamilyOffset(), keyValue.getFamilyLength(),
+                buf, keyValue.getQualifierOffset(), 
keyValue.getQualifierLength(),
+                keyValue.getTimestamp(), 
KeyValue.Type.codeToType(keyValue.getType()),
+                buf, keyValue.getValueOffset(), keyValue.getValueLength());
+    }
+
+}

Reply via email to