Repository: phoenix
Updated Branches:
  refs/heads/4.8-HBase-1.0 cb6787894 -> 62ee84eff


PHOENIX-3230 Upgrade code running concurrently on different JVMs could make 
clients unusuable


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/62ee84ef
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/62ee84ef
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/62ee84ef

Branch: refs/heads/4.8-HBase-1.0
Commit: 62ee84eff374130d9aba1dcc4a3d931fd413ed6c
Parents: cb67878
Author: Samarth <samarth.j...@salesforce.com>
Authored: Thu Sep 22 15:43:46 2016 -0700
Committer: Samarth <samarth.j...@salesforce.com>
Committed: Thu Sep 22 15:43:46 2016 -0700

----------------------------------------------------------------------
 .../phoenix/coprocessor/MetaDataProtocol.java   |  36 ++-
 .../phoenix/exception/SQLExceptionCode.java     |   5 +-
 .../query/ConnectionQueryServicesImpl.java      | 289 +++++++++++++++----
 .../apache/phoenix/query/QueryConstants.java    |   3 +-
 .../org/apache/phoenix/util/UpgradeUtil.java    |  18 +-
 5 files changed, 280 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/62ee84ef/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 8982fe7..04bef92 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.coprocessor;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
@@ -61,7 +63,7 @@ import com.google.protobuf.ByteString;
 public abstract class MetaDataProtocol extends MetaDataService {
     public static final int PHOENIX_MAJOR_VERSION = 4;
     public static final int PHOENIX_MINOR_VERSION = 8;
-    public static final int PHOENIX_PATCH_NUMBER = 0;
+    public static final int PHOENIX_PATCH_NUMBER = 1;
     public static final int PHOENIX_VERSION =
             VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, 
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
 
@@ -81,8 +83,26 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0 = 
MIN_TABLE_TIMESTAMP + 9;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 = 
MIN_TABLE_TIMESTAMP + 15;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 = 
MIN_TABLE_TIMESTAMP + 18;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 = 
MIN_TABLE_TIMESTAMP + 18;
     // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the 
MIN_SYSTEM_TABLE_TIMESTAMP_* constants
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = 
MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = 
MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1;
+    
+    // ALWAYS update this map whenever rolling out a new release (major, minor 
or patch release). 
+    // Key is the SYSTEM.CATALOG timestamp for the version and value is the 
version string.
+    private static final NavigableMap<Long, String> TIMESTAMP_VERSION_MAP = 
new TreeMap<>();
+    static {
+        TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0, "4.1.x");
+        TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_2_0, "4.2.0");
+        TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1, "4.2.1");
+        TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, "4.3.x");
+        TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, "4.5.x");
+        TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, "4.6.x");
+        TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, "4.7.x");
+        TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0, "4.8.x");
+    }
+    
+    public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION 
+ "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER; 
+    
     // TODO: pare this down to minimum, as we don't need duplicates for both 
table and column errors, nor should we need
     // a different code for every type of error.
     // ENTITY_ALREADY_EXISTS, ENTITY_NOT_FOUND, NEWER_ENTITY_FOUND, 
ENTITY_NOT_IN_REGION, CONCURRENT_MODIFICATION
@@ -382,4 +402,14 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
             return schema;
         }
     }
-}
+  
+    public static String getVersion(long serverTimestamp) {
+        /*
+         * It is possible that when clients are trying to run upgrades 
concurrently, we could be at an intermediate
+         * server timestamp. Using floorKey provides us a range based lookup 
where the timestamp range for a release is
+         * [timeStampForRelease, timestampForNextRelease).
+         */
+        String version = 
TIMESTAMP_VERSION_MAP.get(TIMESTAMP_VERSION_MAP.floorKey(serverTimestamp));
+        return version;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/62ee84ef/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 5a8fffa..b0020cf 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -272,7 +272,7 @@ public enum SQLExceptionCode {
     DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1069, "43A69", "Default column 
family not allowed on VIEW or shared INDEX."),
     ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL(1070, "44A01", "Only tables may 
be declared as transactional."),
     TX_MAY_NOT_SWITCH_TO_NON_TX(1071, "44A02", "A transactional table may not 
be switched to non transactional."),
-       STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls 
must be true when a table is transactional."),
+    STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls 
must be true when a table is transactional."),
     CANNOT_START_TRANSACTION_WITH_SCN_SET(1073, "44A04", "Cannot start a 
transaction on a connection with SCN set."),
     TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE(1074, "44A05", "A transactional 
table must define VERSION of greater than one."),
     CANNOT_SPECIFY_SCN_FOR_TXN_TABLE(1075, "44A06", "Cannot use a connection 
with SCN set for a transactional table."),
@@ -370,6 +370,7 @@ public enum SQLExceptionCode {
     OUTDATED_JARS(2007, "INT09", "Outdated jars."),
     INDEX_METADATA_NOT_FOUND(2008, "INT10", "Unable to find cached index 
metadata. "),
     UNKNOWN_ERROR_CODE(2009, "INT11", "Unknown error code."),
+    CONCURRENT_UPGRADE_IN_PROGRESS(2010, "INT12", ""),
     OPERATION_TIMED_OUT(6000, "TIM01", "Operation timed out.", new Factory() {
         @Override
         public SQLException newException(SQLExceptionInfo info) {
@@ -483,4 +484,4 @@ public enum SQLExceptionCode {
         }
         return code;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/62ee84ef/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 25963f8..6867b6d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -18,15 +18,20 @@
 package org.apache.phoenix.query;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.hadoop.hbase.HColumnDescriptor.TTL;
+import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
+import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
+import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
+import static org.apache.phoenix.util.UpgradeUtil.getUpgradeSnapshotName;
 import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
 
 import java.io.IOException;
@@ -75,6 +80,7 @@ import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
@@ -84,7 +90,6 @@ import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -204,6 +209,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
@@ -261,14 +267,12 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     private ScheduledExecutorService renewLeaseExecutor;
     private final boolean renewLeaseEnabled;
 
-
-
     private static interface FeatureSupported {
         boolean isSupported(ConnectionQueryServices services);
     }
 
     private final Map<Feature, FeatureSupported> featureMap = 
ImmutableMap.<Feature, FeatureSupported>of(
-            Feature.LOCAL_INDEX, new FeatureSupported(){
+            Feature.LOCAL_INDEX, new FeatureSupported() {
                 @Override
                 public boolean isSupported(ConnectionQueryServices services) {
                     int hbaseVersion = services.getLowestClusterHBaseVersion();
@@ -1518,8 +1522,8 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             }
             invalidateTables(result.getTableNamesToDelete());
             if (tableType == PTableType.TABLE) {
-                byte[] physicalName = table.getPhysicalName().getBytes();
                 long timestamp = 
MetaDataUtil.getClientTimeStamp(tableMetaData);
+                byte[] physicalName = table.getPhysicalName().getBytes();
                 ensureViewIndexTableDropped(physicalName, timestamp);
                 ensureLocalIndexTableDropped(physicalName, timestamp);
                 tableStatsCache.invalidate(new 
ImmutableBytesPtr(physicalName));
@@ -1593,7 +1597,8 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     }
 
     private void dropTables(final List<byte[]> tableNamesToDelete) throws 
SQLException {
-        try (HBaseAdmin admin = getAdmin()){
+        SQLException sqlE = null;
+        try (HBaseAdmin admin = getAdmin()) {
             if (tableNamesToDelete != null){
                 for ( byte[] tableName : tableNamesToDelete ) {
                     if ( admin.tableExists(tableName) ) {
@@ -1605,8 +1610,12 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             }
 
         } catch (IOException e) {
-            throw ServerUtil.parseServerException(e);
-        } 
+            sqlE = ServerUtil.parseServerException(e);
+        } finally {
+            if (sqlE != null) {
+                throw sqlE;
+            }
+        }
     }
 
     private static Map<String,Object> 
createPropertiesMap(Map<ImmutableBytesWritable,ImmutableBytesWritable> 
htableProps) {
@@ -2302,6 +2311,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                         }
                         checkClosed();
                         PhoenixConnection metaConnection = null;
+                        boolean success = false;
+                        String snapshotName = null;
+                        String sysCatalogTableName = null;
                         try {
                             openConnection();
                             String noUpgradeProp = 
props.getProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB);
@@ -2335,17 +2347,20 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                             }
                             try {
                                 
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
-
                             } catch (NewerTableAlreadyExistsException ignore) {
                                 // Ignore, as this will happen if the 
SYSTEM.CATALOG already exists at this fixed timestamp.
                                 // A TableAlreadyExistsException is not 
thrown, since the table only exists *after* this fixed timestamp.
                             } catch (TableAlreadyExistsException e) {
                                 if (upgradeSystemTables) {
-                                    // This will occur if we have an older 
SYSTEM.CATALOG and we need to update it to include
-                                    // any new columns we've added.
                                     long currentServerSideTableTimeStamp = 
e.getTable().getTimeStamp();
-
+                                    sysCatalogTableName = 
e.getTable().getPhysicalName().getString();
+                                    if (currentServerSideTableTimeStamp < 
MIN_SYSTEM_TABLE_TIMESTAMP && 
acquireUpgradeMutex(currentServerSideTableTimeStamp, 
e.getTable().getPhysicalName().getBytes())) {
+                                        snapshotName = 
getUpgradeSnapshotName(sysCatalogTableName, currentServerSideTableTimeStamp);
+                                        createSnapshot(snapshotName, 
sysCatalogTableName);
+                                    }
                                     String columnsToAdd = "";
+                                    // This will occur if we have an older 
SYSTEM.CATALOG and we need to update it to include
+                                    // any new columns we've added.
                                     if(currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
                                         // We know that we always need to add 
the STORE_NULLS column for 4.3 release
                                         columnsToAdd = addColumn(columnsToAdd, 
PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName());
@@ -2355,8 +2370,8 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                                 if 
(table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null
                                                         && 
table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) {
                                                     
table.setValue(MetaDataUtil.PARENT_TABLE_KEY,
-                                                        
MetaDataUtil.getUserTableName(table
-                                                            
.getNameAsString()));
+                                                            
MetaDataUtil.getUserTableName(table
+                                                                    
.getNameAsString()));
                                                     // Explicitly disable, 
modify and enable the table to ensure co-location of data
                                                     // and index regions. If 
we just modify the table descriptor when online schema
                                                     // change enabled may 
reopen the region in same region server instead of following data region.
@@ -2375,15 +2390,15 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                     // the column names that have been added 
to SYSTEM.CATALOG since 4.0.
                                     if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
                                         columnsToAdd = addColumn(columnsToAdd, 
PhoenixDatabaseMetaData.INDEX_TYPE + " " + 
PUnsignedTinyint.INSTANCE.getSqlTypeName()
-                                            + ", " + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + 
PLong.INSTANCE.getSqlTypeName());
+                                                + ", " + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + 
PLong.INSTANCE.getSqlTypeName());
                                     }
 
                                     // If we have some new columns from 
4.1-4.3 to add, add them now.
                                     if (!columnsToAdd.isEmpty()) {
                                         // Ugh..need to assign to another 
local variable to keep eclipse happy.
                                         PhoenixConnection newMetaConnection = 
addColumnsIfNotExists(metaConnection,
-                                            
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                            
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd);
+                                                
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                                                
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd);
                                         metaConnection = newMetaConnection;
                                     }
 
@@ -2392,7 +2407,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                                 + 
PInteger.INSTANCE.getSqlTypeName();
                                         try {
                                             metaConnection = 
addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                                
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, false);
+                                                    
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, false);
                                             upgradeTo4_5_0(metaConnection);
                                         } catch (ColumnAlreadyExistsException 
ignored) {
                                             /*
@@ -2427,7 +2442,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                     if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) {
                                         columnsToAdd = 
PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + 
PBoolean.INSTANCE.getSqlTypeName();
                                         metaConnection = 
addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                            
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd);
+                                                
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd);
                                     }
                                     if(currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
                                         // Drop old stats table so that new 
stats table is created
@@ -2449,30 +2464,31 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
 
                                     if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
                                         metaConnection = 
addColumnsIfNotExists(metaConnection,
-                                            
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                            
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
-                                            
PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " "
-                                                    + 
PBoolean.INSTANCE.getSqlTypeName());
+                                                
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                                                
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
+                                                
PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " "
+                                                        + 
PBoolean.INSTANCE.getSqlTypeName());
                                         metaConnection = 
addColumnsIfNotExists(metaConnection,
-                                            
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                            
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1,
-                                            
PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " "
-                                                    + 
PVarchar.INSTANCE.getSqlTypeName());
+                                                
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                                                
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1,
+                                                
PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " "
+                                                        + 
PVarchar.INSTANCE.getSqlTypeName());
                                         metaConnection = 
addColumnsIfNotExists(metaConnection,
-                                            
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                                            
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0,
-                                            
PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " "
-                                                    + 
PBoolean.INSTANCE.getSqlTypeName());
+                                                
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                                                
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0,
+                                                
PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " "
+                                                        + 
PBoolean.INSTANCE.getSqlTypeName());
                                         metaConnection = 
UpgradeUtil.disableViewIndexes(metaConnection);
                                         
if(getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB,
-                                            
QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) {
+                                                
QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) {
                                             metaConnection = 
UpgradeUtil.upgradeLocalIndexes(metaConnection);
                                         }
                                         
ConnectionQueryServicesImpl.this.removeTable(null,
-                                            
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
-                                            
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0);
+                                                
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
+                                                
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0);
                                         clearCache();
                                     }
+
                                 }
                             }
 
@@ -2529,11 +2545,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                     long currentServerSideTableTimeStamp = 
e.getTable().getTimeStamp();
                                     if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
                                         metaConnection = addColumnsIfNotExists(
-                                                metaConnection,
-                                                
PhoenixDatabaseMetaData.SYSTEM_STATS_NAME,
-                                                
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
-                                                
PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " "
-                                                        + 
PLong.INSTANCE.getSqlTypeName());
+                                               metaConnection,
+                                               SYSTEM_STATS_NAME,
+                                               
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
+                                               
PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " "
+                                                       + 
PLong.INSTANCE.getSqlTypeName());
                                     }
                                 }
                             }
@@ -2550,7 +2566,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                             + 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
                                 } catch (NewerSchemaAlreadyExistsException e) 
{}
                             }
-                            scheduleRenewLeaseTasks(); 
+                            success = true;
+                            scheduleRenewLeaseTasks();
+                        } catch (UpgradeInProgressException e) {
+                            // don't set it as initializationException because 
otherwise client won't be able to retry
+                            throw e;
                         } catch (Exception e) {
                             if (e instanceof SQLException) {
                                 initializationException = (SQLException)e;
@@ -2569,6 +2589,15 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                 }
                             } finally {
                                 try {
+                                    restoreFromSnapshot(sysCatalogTableName, 
snapshotName, success);
+                                } catch (SQLException e) {
+                                    if (initializationException != null) {
+                                        
initializationException.setNextException(e);
+                                    } else {
+                                        initializationException = e;
+                                    }
+                                }
+                                try {
                                     if (initializationException != null) {
                                         throw initializationException;
                                     }
@@ -2581,6 +2610,101 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     return null;
                 }
 
+                private void createSnapshot(String snapshotName, String 
tableName)
+                        throws SQLException {
+                    HBaseAdmin admin = null;
+                    SQLException sqlE = null;
+                    try {
+                        admin = getAdmin();
+                        admin.snapshot(snapshotName, tableName);
+                        logger.info("Successfully created snapshot " + 
snapshotName + " for "
+                                + tableName);
+                    } catch (Exception e) {
+                        sqlE = new SQLException(e);
+                    } finally {
+                        try {
+                            if (admin != null) {
+                                admin.close();
+                            }
+                        } catch (Exception e) {
+                            SQLException adminCloseEx = new SQLException(e);
+                            if (sqlE == null) {
+                                sqlE = adminCloseEx;
+                            } else {
+                                sqlE.setNextException(adminCloseEx);
+                            }
+                        } finally {
+                            if (sqlE != null) {
+                                throw sqlE;
+                            }
+                        }
+                    }
+                }
+
+                private void restoreFromSnapshot(String tableName, String 
snapshotName,
+                        boolean success) throws SQLException {
+                    boolean snapshotRestored = false;
+                    boolean tableDisabled = false;
+                    if (!success && snapshotName != null) {
+                        SQLException sqlE = null;
+                        HBaseAdmin admin = null;
+                        try {
+                            logger.warn("Starting restore of " + tableName + " 
using snapshot "
+                                    + snapshotName + " because upgrade 
failed");
+                            admin = getAdmin();
+                            admin.disableTable(tableName);
+                            tableDisabled = true;
+                            admin.restoreSnapshot(snapshotName);
+                            snapshotRestored = true;
+                            logger.warn("Successfully restored " + tableName + 
" using snapshot "
+                                    + snapshotName);
+                        } catch (Exception e) {
+                            sqlE = new SQLException(e);
+                        } finally {
+                            if (admin != null && tableDisabled) {
+                                try {
+                                    admin.enableTable(tableName);
+                                    if (snapshotRestored) {
+                                        logger.warn("Successfully restored and 
enabled " + tableName + " using snapshot "
+                                                + snapshotName);
+                                    } else {
+                                        logger.warn("Successfully enabled " + 
tableName + " after restoring using snapshot "
+                                                + snapshotName + " failed. ");
+                                    }
+                                } catch (Exception e1) {
+                                    SQLException enableTableEx = new 
SQLException(e1);
+                                    if (sqlE == null) {
+                                        sqlE = enableTableEx;
+                                    } else {
+                                        sqlE.setNextException(enableTableEx);
+                                    }
+                                    logger.error("Failure in enabling "
+                                            + tableName
+                                            + (snapshotRestored ? " after 
successfully restoring using snapshot"
+                                                    + snapshotName
+                                                    : " after restoring using 
snapshot "
+                                                    + snapshotName + " failed. 
"));
+                                } finally {
+                                    try {
+                                        admin.close();
+                                    } catch (Exception e2) {
+                                        SQLException adminCloseEx = new 
SQLException(e2);
+                                        if (sqlE == null) {
+                                            sqlE = adminCloseEx;
+                                        } else {
+                                            
sqlE.setNextException(adminCloseEx);
+                                        }
+                                    } finally {
+                                        if (sqlE != null) {
+                                            throw sqlE;
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+
                 private void ensureSystemTablesUpgraded(ReadOnlyProps props)
                         throws SQLException, IOException, 
IllegalArgumentException, InterruptedException {
                     if 
(!SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, props)) { return; }
@@ -2622,6 +2746,43 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                         }
                     }
                 }
+
+                /**
+                 * Acquire distributed mutex of sorts to make sure only one 
JVM is able to run the upgrade code by
+                 * making use of HBase's checkAndPut api.
+                 * <p>
+                 * This method was added as part of 4.8.1 release. For clients 
upgrading to 4.8.1, the old value in the
+                 * version cell will be null i.e. the {@value 
QueryConstants#UPGRADE_MUTEX} column will be non-existent. For client's
+                 * upgrading to a release newer than 4.8.1 the existing 
version cell will be non-null. The client which
+                 * wins the race will end up setting the version cell to the 
{@value MetaDataProtocol#MIN_SYSTEM_TABLE_TIMESTAMP}
+                 * for the release.
+                 * </p>
+                 *
+                 * @return true if client won the race, false otherwise
+                 * @throws IOException
+                 * @throws SQLException
+                 */
+                private boolean acquireUpgradeMutex(long 
currentServerSideTableTimestamp, byte[] sysCatalogTableName) throws IOException,
+                SQLException {
+                    
Preconditions.checkArgument(currentServerSideTableTimestamp < 
MIN_SYSTEM_TABLE_TIMESTAMP);
+                    try (HTableInterface sysCatalogTable = 
getTable(sysCatalogTableName)) {
+                        byte[] row = SchemaUtil.getTableKey(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+                                PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
+                        byte[] family = 
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
+                        byte[] qualifier = QueryConstants.UPGRADE_MUTEX;
+                        byte[] oldValue = currentServerSideTableTimestamp < 
MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 ? null
+                                : 
PLong.INSTANCE.toBytes(currentServerSideTableTimestamp);
+                        byte[] newValue = 
PLong.INSTANCE.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP);
+                        // Note that the timestamp for this put doesn't really 
matter since UPGRADE_MUTEX column isn't used
+                        // to calculate SYSTEM.CATALOG's server side timestamp.
+                        Put put = new Put(row);
+                        put.add(family, qualifier, newValue);
+                        boolean acquired = sysCatalogTable.checkAndPut(row, 
family, qualifier, oldValue, put);
+                        if (!acquired) { throw new UpgradeInProgressException(
+                                getVersion(currentServerSideTableTimestamp), 
getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); }
+                        return true;
+                    }
+                }
             });
         } catch (Exception e) {
             Throwables.propagateIfInstanceOf(e, SQLException.class);
@@ -2629,6 +2790,14 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
 
+    private static class UpgradeInProgressException extends SQLException {
+        public UpgradeInProgressException(String upgradeFrom, String 
upgradeTo) {
+            super("Cluster is being concurrently upgraded from " + upgradeFrom 
+ " to " + upgradeTo
+                    + ". Please retry establishing connection.", 
SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS
+                    .getSQLState(), 
SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS.getErrorCode());
+        }
+    }
+
     private List<String> getTableNames(List<HTableDescriptor> tables) {
         List<String> tableNames = new ArrayList<String>(4);
         for (HTableDescriptor desc : tables) {
@@ -2644,20 +2813,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             return columnsToAddSoFar + ", " + columns;
         }
     }
-    
-    private void scheduleRenewLeaseTasks() {
-        if (isRenewingLeasesEnabled()) {
-            ThreadFactory threadFactory =
-                    new ThreadFactoryBuilder().setDaemon(true)
-                    .setNameFormat("PHOENIX-SCANNER-RENEW-LEASE" + 
"-thread-%s").build();
-            renewLeaseExecutor =
-                    Executors.newScheduledThreadPool(renewLeasePoolSize, 
threadFactory);
-            for (LinkedBlockingQueue<WeakReference<PhoenixConnection>> q : 
connectionQueues) {
-                renewLeaseExecutor.scheduleAtFixedRate(new RenewLeaseTask(q), 
0,
-                        renewLeaseTaskFrequency, TimeUnit.MILLISECONDS);
-            }
-        }
-    }   
 
     /**
      * Set IMMUTABLE_ROWS to true for all index tables over immutable tables.
@@ -2782,6 +2937,20 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         return metaConnection;
     }
 
+    private void scheduleRenewLeaseTasks() {
+        if (isRenewingLeasesEnabled()) {
+            ThreadFactory threadFactory =
+                    new ThreadFactoryBuilder().setDaemon(true)
+                    .setNameFormat("PHOENIX-SCANNER-RENEW-LEASE" + 
"-thread-%s").build();
+            renewLeaseExecutor =
+                    Executors.newScheduledThreadPool(renewLeasePoolSize, 
threadFactory);
+            for (LinkedBlockingQueue<WeakReference<PhoenixConnection>> q : 
connectionQueues) {
+                renewLeaseExecutor.scheduleAtFixedRate(new RenewLeaseTask(q), 
0,
+                        renewLeaseTaskFrequency, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
     private static int getSaltBuckets(TableAlreadyExistsException e) {
         PTable table = e.getTable();
         Integer sequenceSaltBuckets = table == null ? null : 
table.getBucketNum();
@@ -2885,8 +3054,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     @Override
     public HBaseAdmin getAdmin() throws SQLException {
         try {
-            // do not use the HBaseAdmin(this.config) constructor
-            // since it always establishes a new HConnection which is 
expensive.
             return new HBaseAdmin(connection);
         } catch (IOException e) {
             throw new PhoenixIOException(e);
@@ -3274,10 +3441,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             }
         }
     }
-    
-    private int getQueueIndex(PhoenixConnection conn) {
-        return ThreadLocalRandom.current().nextInt(renewLeasePoolSize);
-    }
 
     @Override
     public void removeConnection(PhoenixConnection connection) throws 
SQLException {
@@ -3300,6 +3463,10 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
 
+    private int getQueueIndex(PhoenixConnection conn) {
+        return ThreadLocalRandom.current().nextInt(renewLeasePoolSize);
+    }
+
     @Override
     public KeyValueBuilder getKeyValueBuilder() {
         return this.kvBuilder;
@@ -3712,4 +3879,4 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     public void invalidateStats(ImmutableBytesPtr tableName) {
         this.tableStatsCache.invalidate(Objects.requireNonNull(tableName));
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/62ee84ef/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index b3a5a36..b87135d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -362,5 +362,6 @@ public interface QueryConstants {
     public static final byte[] OFFSET_FAMILY = "f_offset".getBytes();
     public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();
     public static final String LAST_SCAN = "LAST_SCAN";
+    public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/62ee84ef/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index d7ed01a..5414bdb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.util;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.CURRENT_CLIENT_VERSION;
+import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
@@ -29,7 +31,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
@@ -51,10 +52,13 @@ import static 
org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_
 
 import java.io.IOException;
 import java.sql.Connection;
+import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.text.Format;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1576,9 +1580,9 @@ public class UpgradeUtil {
     }
 
     public static void addRowKeyOrderOptimizableCell(List<Mutation> 
tableMetadata, byte[] tableHeaderRowKey, long clientTimeStamp) {
-        Put put = new Put(tableHeaderRowKey);
-        put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                MetaDataEndpointImpl.ROW_KEY_ORDER_OPTIMIZABLE_BYTES, 
clientTimeStamp, PBoolean.INSTANCE.toBytes(true));
+        Put put = new Put(tableHeaderRowKey, clientTimeStamp);
+        put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                MetaDataEndpointImpl.ROW_KEY_ORDER_OPTIMIZABLE_BYTES, 
PBoolean.INSTANCE.toBytes(true));
         tableMetadata.add(put);
     }
 
@@ -1888,4 +1892,10 @@ public class UpgradeUtil {
         }
     }
 
+    public static final String getUpgradeSnapshotName(String tableString, long 
currentSystemTableTimestamp) {
+        Format formatter = new SimpleDateFormat("yyMMddHHmmssZ");
+        String date = formatter.format(new Date(System.currentTimeMillis()));
+        String upgradingFrom = getVersion(currentSystemTableTimestamp);
+        return "SNAPSHOT_" + tableString + "_" + upgradingFrom + "_TO_" + 
CURRENT_CLIENT_VERSION + "_" + date;
+    }
 }
\ No newline at end of file

Reply via email to