Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.4 6e9244e79 -> d0d7daecd


PHOENIX-4799 Write cells using checkAndMutate to prevent conflicting changes


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d0d7daec
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d0d7daec
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d0d7daec

Branch: refs/heads/4.x-HBase-1.4
Commit: d0d7daecd9480261c2bcd94436c3e33abb973399
Parents: 6e9244e
Author: Thomas D'Silva <tdsi...@apache.org>
Authored: Tue Jul 24 10:49:31 2018 -0700
Committer: Thomas D'Silva <tdsi...@apache.org>
Committed: Mon Aug 6 10:53:27 2018 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/AlterTableWithViewsIT.java  |   2 +-
 .../phoenix/end2end/BasePermissionsIT.java      |  66 +++---
 .../phoenix/end2end/ChangePermissionsIT.java    |   4 +-
 .../MigrateSystemTablesToSystemNamespaceIT.java |  13 +-
 .../end2end/QueryDatabaseMetaDataIT.java        |   4 +
 .../phoenix/end2end/TableDDLPermissionsIT.java  |   9 +-
 .../end2end/TenantSpecificTablesDDLIT.java      |   2 +
 .../org/apache/phoenix/end2end/UpgradeIT.java   |  34 +--
 .../java/org/apache/phoenix/end2end/ViewIT.java | 235 +++++++++++++++++--
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   2 +
 .../phoenix/query/ConnectionQueryServices.java  |  14 ++
 .../query/ConnectionQueryServicesImpl.java      | 170 +++++++-------
 .../query/ConnectionlessQueryServicesImpl.java  |  20 ++
 .../query/DelegateConnectionQueryServices.java  |  11 +
 .../apache/phoenix/query/QueryConstants.java    |  14 ++
 .../apache/phoenix/schema/MetaDataClient.java   |  95 +++++++-
 16 files changed, 523 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
index e39d492..e97a40d 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
@@ -530,7 +530,7 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
             
             try {
                 // should fail because there are two view with different pk 
columns
-                conn.createStatement().execute("ALTER TABLE " + tableName + " 
ADD VIEW_COL1 DECIMAL PRIMARY KEY, VIEW_COL2 VARCHAR PRIMARY KEY");
+                conn.createStatement().execute("ALTER TABLE " + tableName + " 
ADD VIEW_COL1 DECIMAL(10,2) PRIMARY KEY, VIEW_COL2 VARCHAR(256) PRIMARY KEY");
                 fail();
             }
             catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
index d33d538..88a942e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
@@ -16,8 +16,29 @@
  */
 package org.apache.phoenix.end2end;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Throwables;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,34 +57,13 @@ import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.security.PrivilegedExceptionAction;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
 
 @RunWith(Parameterized.class)
 public class BasePermissionsIT extends BaseTest {
@@ -73,17 +73,23 @@ public class BasePermissionsIT extends BaseTest {
     static String SUPERUSER;
 
     static HBaseTestingUtility testUtil;
-    static final Set<String> PHOENIX_SYSTEM_TABLES = new 
HashSet<>(Arrays.asList(
-            "SYSTEM.CATALOG", "SYSTEM.SEQUENCE", "SYSTEM.STATS", 
"SYSTEM.FUNCTION"));
+    static final Set<String> PHOENIX_SYSTEM_TABLES =
+            new HashSet<>(Arrays.asList("SYSTEM.CATALOG", "SYSTEM.SEQUENCE", 
"SYSTEM.STATS",
+                "SYSTEM.FUNCTION", "SYSTEM.MUTEX"));
 
-    static final Set<String> PHOENIX_SYSTEM_TABLES_IDENTIFIERS = new 
HashSet<>(Arrays.asList(
-            "SYSTEM.\"CATALOG\"", "SYSTEM.\"SEQUENCE\"", "SYSTEM.\"STATS\"", 
"SYSTEM.\"FUNCTION\""));
+    static final Set<String> PHOENIX_SYSTEM_TABLES_IDENTIFIERS =
+            new HashSet<>(Arrays.asList("SYSTEM.\"CATALOG\"", 
"SYSTEM.\"SEQUENCE\"",
+                "SYSTEM.\"STATS\"", "SYSTEM.\"FUNCTION\"", 
"SYSTEM.\"MUTEX\""));
 
     static final String SYSTEM_SEQUENCE_IDENTIFIER =
             QueryConstants.SYSTEM_SCHEMA_NAME + "." + "\"" + 
PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE+ "\"";
 
+    static final String SYSTEM_MUTEX_IDENTIFIER =
+            QueryConstants.SYSTEM_SCHEMA_NAME + "." + "\""
+                    + PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME + "\"";
+
     static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = new 
HashSet<>(Arrays.asList(
-            "SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", 
"SYSTEM:FUNCTION"));
+            "SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", 
"SYSTEM:FUNCTION", "SYSTEM:MUTEX"));
 
     // Create Multiple users so that we can use Hadoop UGI to run tasks as 
various users
     // Permissions can be granted or revoke by superusers and admins only

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java
index 3965f69..7f2964d 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java
@@ -57,7 +57,8 @@ public class ChangePermissionsIT extends BasePermissionsIT {
             } else {
                 verifyAllowed(grantPermissions("RX", user, 
PHOENIX_SYSTEM_TABLES_IDENTIFIERS, false), superUser);
             }
-            verifyAllowed(grantPermissions("W", user, 
SYSTEM_SEQUENCE_IDENTIFIER, false), superUser);
+            verifyAllowed(grantPermissions("RWX", user, 
SYSTEM_SEQUENCE_IDENTIFIER, false), superUser);
+            verifyAllowed(grantPermissions("RWX", user, 
SYSTEM_MUTEX_IDENTIFIER, false), superUser);
         }
     }
 
@@ -69,6 +70,7 @@ public class ChangePermissionsIT extends BasePermissionsIT {
                 verifyAllowed(revokePermissions(user, 
PHOENIX_SYSTEM_TABLES_IDENTIFIERS, false), superUser);
             }
             verifyAllowed(revokePermissions(user, SYSTEM_SEQUENCE_IDENTIFIER, 
false), superUser);
+            verifyAllowed(revokePermissions(user, SYSTEM_MUTEX_IDENTIFIER, 
false), superUser);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
index d253f6e..ffac4d6 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
@@ -54,7 +54,6 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -275,16 +274,15 @@ public class MigrateSystemTablesToSystemNamespaceIT 
extends BaseTest {
 
     private void changeMutexLock(Properties clientProps, boolean acquire) 
throws SQLException, IOException {
         ConnectionQueryServices services = null;
-        byte[] mutexRowKey = SchemaUtil.getTableKey(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
-                PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
 
         try (Connection conn = DriverManager.getConnection(getJdbcUrl(), 
clientProps)) {
             services = conn.unwrap(PhoenixConnection.class).getQueryServices();
             if(acquire) {
                assertTrue(((ConnectionQueryServicesImpl) services)
-                        
.acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP, 
mutexRowKey));
+                        
.acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP));
             } else {
-                ((ConnectionQueryServicesImpl) 
services).releaseUpgradeMutex(mutexRowKey);
+                services.deleteMutexCell(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE, null, null);
             }
         }
     }
@@ -397,14 +395,13 @@ public class MigrateSystemTablesToSystemNamespaceIT 
extends BaseTest {
             }
         }
 
-        // The set will contain SYSMUTEX table since that table is not exposed 
in SYSCAT
         if (systemTablesMapped) {
             if (!systemSchemaExists) {
                 fail(PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME + " entry 
doesn't exist in SYSTEM.CATALOG table.");
             }
-            assertTrue(namespaceMappedSystemTablesSet.size() == 1);
+            assertTrue(namespaceMappedSystemTablesSet.isEmpty());
         } else {
-            assertTrue(systemTablesSet.size() == 1);
+            assertTrue(systemTablesSet.isEmpty());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
index 90f9db6..7d31c62 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
@@ -21,6 +21,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCH
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_TABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
 import static org.apache.phoenix.util.TestUtil.ATABLE_NAME;
 import static org.apache.phoenix.util.TestUtil.CUSTOM_ENTITY_DATA_FULL_NAME;
@@ -175,6 +176,9 @@ public class QueryDatabaseMetaDataIT extends 
ParallelStatsDisabledIT {
             assertEquals(PTableType.SYSTEM.toString(), 
rs.getString("TABLE_TYPE"));
             assertTrue(rs.next());
             assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM"));
+            assertEquals(SYSTEM_MUTEX_TABLE_NAME, rs.getString("TABLE_NAME"));
+            assertTrue(rs.next());
+            assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM"));
             assertEquals(TYPE_SEQUENCE, rs.getString("TABLE_NAME"));
             assertEquals(PTableType.SYSTEM.toString(), 
rs.getString("TABLE_TYPE"));
             assertTrue(rs.next());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
index 8666bb8..86a6b60 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
@@ -20,7 +20,6 @@ import java.security.PrivilegedExceptionAction;
 import java.sql.Connection;
 import java.util.Collections;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.AuthUtil;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
@@ -54,6 +53,10 @@ public class TableDDLPermissionsIT extends BasePermissionsIT 
{
                         Action.READ, Action.EXEC);
                 grantPermissions(unprivilegedUser.getShortName(), 
Collections.singleton("SYSTEM:SEQUENCE"), Action.WRITE,
                         Action.READ, Action.EXEC);
+                grantPermissions(regularUser1.getShortName(), 
Collections.singleton("SYSTEM:MUTEX"), Action.WRITE,
+                    Action.READ, Action.EXEC);
+                grantPermissions(unprivilegedUser.getShortName(), 
Collections.singleton("SYSTEM:MUTEX"), Action.WRITE,
+                    Action.READ, Action.EXEC);
                 
             } else {
                 grantPermissions(regularUser1.getShortName(), 
PHOENIX_SYSTEM_TABLES, Action.READ, Action.EXEC);
@@ -64,6 +67,10 @@ public class TableDDLPermissionsIT extends BasePermissionsIT 
{
                         Action.READ, Action.EXEC);
                 grantPermissions(unprivilegedUser.getShortName(), 
Collections.singleton("SYSTEM:SEQUENCE"), Action.WRITE,
                         Action.READ, Action.EXEC);
+                grantPermissions(regularUser1.getShortName(), 
Collections.singleton("SYSTEM.MUTEX"), Action.WRITE,
+                    Action.READ, Action.EXEC);
+            grantPermissions(unprivilegedUser.getShortName(), 
Collections.singleton("SYSTEM.MUTEX"), Action.WRITE,
+                    Action.READ, Action.EXEC);
             }
         } catch (Throwable e) {
             if (e instanceof Exception) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index dd6f7f7..956b43c 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -498,6 +498,8 @@ public class TenantSpecificTablesDDLIT extends 
BaseTenantSpecificTablesIT {
             assertTrue(rs.next());
             assertTableMetaData(rs, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, 
PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE, PTableType.SYSTEM);
             assertTrue(rs.next());
+            assertTableMetaData(rs, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, 
PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME, PTableType.SYSTEM);
+            assertTrue(rs.next());
             assertTableMetaData(rs, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, 
PhoenixDatabaseMetaData.TYPE_SEQUENCE, PTableType.SYSTEM);
             assertTrue(rs.next());
             assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, 
PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, PTableType.SYSTEM);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
index 2b866a5..c2cf9e6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
@@ -18,8 +18,6 @@
 package org.apache.phoenix.end2end;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX;
-import static 
org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX_UNLOCKED;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -42,8 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.curator.shaded.com.google.common.collect.Sets;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
@@ -425,39 +421,21 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
         }
     }
     
-    private void putUnlockKVInSysMutex(byte[] row) throws Exception {
-        try (Connection conn = getConnection(false, null)) {
-            ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
-            try (HTableInterface sysMutexTable = 
services.getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
-                byte[] family = 
PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
-                byte[] qualifier = UPGRADE_MUTEX;
-                Put put = new Put(row);
-                put.addColumn(family, qualifier, UPGRADE_MUTEX_UNLOCKED);
-                sysMutexTable.put(put);
-                sysMutexTable.flushCommits();
-            }
-        }
-    }
-    
     @Test
     public void testAcquiringAndReleasingUpgradeMutex() throws Exception {
         ConnectionQueryServices services = null;
-        byte[] mutexRowKey = SchemaUtil.getTableKey(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
-                generateUniqueName());
         try (Connection conn = getConnection(false, null)) {
             services = conn.unwrap(PhoenixConnection.class).getQueryServices();
-            putUnlockKVInSysMutex(mutexRowKey);
             assertTrue(((ConnectionQueryServicesImpl)services)
-                    
.acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, 
mutexRowKey));
+                    
.acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0));
             try {
                 ((ConnectionQueryServicesImpl)services)
-                        
.acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, 
mutexRowKey);
+                        
.acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
                 fail();
             } catch (UpgradeInProgressException expected) {
 
             }
-            
assertTrue(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey));
-            
assertFalse(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey));
+            ((ConnectionQueryServicesImpl)services).releaseUpgradeMutex();
         }
     }
     
@@ -471,7 +449,6 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
         final byte[] mutexKey = Bytes.toBytes(generateUniqueName());
         try (Connection conn = getConnection(false, null)) {
             services = conn.unwrap(PhoenixConnection.class).getQueryServices();
-            putUnlockKVInSysMutex(mutexKey);
             FutureTask<Void> task1 = new FutureTask<>(new 
AcquireMutexRunnable(mutexStatus1, services, latch, numExceptions, mutexKey));
             FutureTask<Void> task2 = new FutureTask<>(new 
AcquireMutexRunnable(mutexStatus2, services, latch, numExceptions, mutexKey));
             Thread t1 = new Thread(task1);
@@ -509,12 +486,15 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
         public Void call() throws Exception {
             try {
                 ((ConnectionQueryServicesImpl)services).acquireUpgradeMutex(
-                        MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, 
mutexRowKey);
+                        MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
                 acquireStatus.set(true);
             } catch (UpgradeInProgressException e) {
                 numExceptions.incrementAndGet();
             } finally {
                 latch.countDown();
+                if (acquireStatus.get()) {
+                    
((ConnectionQueryServicesImpl)services).releaseUpgradeMutex();
+                }
             }
             return null;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
index fdfd75b..fda9490 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
@@ -20,11 +20,13 @@ package org.apache.phoenix.end2end;
 import static com.google.common.collect.Lists.newArrayListWithExpectedSize;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_MODIFY_VIEW_PK;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.NOT_NULLABLE_COLUMN_IN_ROW_KEY;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.apache.phoenix.util.TestUtil.analyzeTable;
 import static org.apache.phoenix.util.TestUtil.getAllSplits;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -43,6 +45,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -52,9 +61,9 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -67,6 +76,7 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.ColumnAlreadyExistsException;
+import org.apache.phoenix.schema.ConcurrentTableMutationException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
@@ -74,6 +84,7 @@ import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
@@ -93,6 +104,17 @@ public class ViewIT extends SplitSystemCatalogIT {
 
     protected String tableDDLOptions;
     protected boolean transactional;
+    
+    private static final String FAILED_VIEWNAME = "FAILED_VIEW";
+    private static final byte[] FAILED_ROWKEY_BYTES =
+            SchemaUtil.getTableKey(null, Bytes.toBytes(SCHEMA2), 
Bytes.toBytes(FAILED_VIEWNAME));
+    private static final String SLOW_VIEWNAME_PREFIX = "SLOW_VIEW";
+    private static final byte[] SLOW_ROWKEY_PREFIX_BYTES =
+            SchemaUtil.getTableKey(null, Bytes.toBytes(SCHEMA2),
+                Bytes.toBytes(SLOW_VIEWNAME_PREFIX));
+
+    private static volatile CountDownLatch latch1 = null;
+    private static volatile CountDownLatch latch2 = null;
 
     public ViewIT(boolean transactional) {
         StringBuilder optionBuilder = new StringBuilder();
@@ -114,7 +136,7 @@ public class ViewIT extends SplitSystemCatalogIT {
         Map<String, String> props = Collections.emptyMap();
         boolean splitSystemCatalog = (driver == null);
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
-        serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
+        serverProps.put("hbase.coprocessor.region.classes", 
TestMetaDataRegionObserver.class.getName());
         serverProps.put("hbase.coprocessor.abortonerror", "false");
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(props.entrySet().iterator()));
         // Split SYSTEM.CATALOG once after the mini-cluster is started
@@ -123,6 +145,49 @@ public class ViewIT extends SplitSystemCatalogIT {
         }
     }
     
+    public static class TestMetaDataRegionObserver extends BaseRegionObserver {
+        @Override
+        public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+                MiniBatchOperationInProgress<Mutation> miniBatchOp) throws 
IOException {
+            if (shouldFail(c, miniBatchOp.getOperation(0))) {
+                // throwing anything other than instances of IOException result
+                // in this coprocessor being unloaded
+                // DoNotRetryIOException tells HBase not to retry this mutation
+                // multiple times
+                throw new DoNotRetryIOException();
+            } else if (shouldSlowDown(c, miniBatchOp.getOperation(0))) {
+                // simulate a slow write to SYSTEM.CATALOG
+                if (latch1 != null) {
+                    latch1.countDown();
+                }
+                if (latch2 != null) {
+                    try {
+                        // wait till the second task is complete before 
completing the first task
+                        boolean result = latch2.await(2, TimeUnit.MINUTES);
+                        if (!result) {
+                            throw new RuntimeException("Second task took took 
long to complete");
+                        }
+                    } catch (InterruptedException e) {
+                    }
+                }
+            }
+        }
+
+        private boolean 
shouldFail(ObserverContext<RegionCoprocessorEnvironment> c, Mutation m) {
+            TableName tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable();
+            return 
tableName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                    && (Bytes.equals(FAILED_ROWKEY_BYTES, m.getRow()));
+        }
+
+        private boolean 
shouldSlowDown(ObserverContext<RegionCoprocessorEnvironment> c,
+                Mutation m) {
+            TableName tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable();
+            byte[] rowKeyPrefix = Arrays.copyOf(m.getRow(), 
SLOW_ROWKEY_PREFIX_BYTES.length);
+            return 
tableName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                    && (Bytes.equals(SLOW_ROWKEY_PREFIX_BYTES, rowKeyPrefix));
+        }
+    }
+    
     @Test
     public void testReadOnlyOnUpdatableView() throws Exception {
         String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
@@ -1274,28 +1339,160 @@ public class ViewIT extends SplitSystemCatalogIT {
         PhoenixRuntime.getTableNoCache(conn, fullViewName2);
     }
     
-    private static final String FAILED_VIEWNAME = "FAILED_VIEW";
-    private static final byte[] ROWKEY_TO_FAIL_BYTES = 
SchemaUtil.getTableKey(null, Bytes.toBytes(SCHEMA2),
-            Bytes.toBytes(FAILED_VIEWNAME));
-    
-    public static class FailingRegionObserver extends SimpleRegionObserver {
+    @Test
+    public void testConcurrentViewCreationAndTableDrop() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
+            String fullViewName1 =
+                    SchemaUtil.getTableName(SCHEMA2,
+                        SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName());
+            String fullViewName2 = SchemaUtil.getTableName(SCHEMA3, 
generateUniqueName());
+            latch1 = new CountDownLatch(1);
+            latch2 = new CountDownLatch(1);
+            String tableDdl =
+                    "CREATE TABLE " + fullTableName + "  (k INTEGER NOT NULL 
PRIMARY KEY, v1 DATE)"
+                            + tableDDLOptions;
+            conn.createStatement().execute(tableDdl);
+
+            ExecutorService executorService = Executors.newFixedThreadPool(1, 
new ThreadFactory() {
+                @Override
+                public Thread newThread(Runnable r) {
+                    Thread t = Executors.defaultThreadFactory().newThread(r);
+                    t.setDaemon(true);
+                    t.setPriority(Thread.MIN_PRIORITY);
+                    return t;
+                }
+            });
+
+            // create the view in a separate thread (which will take some time
+            // to complete)
+            Future<Exception> future =
+                    executorService.submit(new 
CreateViewRunnable(fullTableName, fullViewName1));
+            // wait till the thread makes the rpc to create the view
+            latch1.await();
+            tableDdl = "DROP TABLE " + fullTableName;
+            try {
+                // drop table should fail as we are concurrently adding a view
+                conn.createStatement().execute(tableDdl);
+                fail("Creating a view while concurrently dropping the base 
table should fail");
+            } catch (ConcurrentTableMutationException e) {
+            }
+            latch2.countDown();
+
+            Exception e = future.get();
+            assertNull(e);
+
+            // create another view to ensure that the cell used to prevent
+            // concurrent modifications was removed
+            String ddl =
+                    "CREATE VIEW " + fullViewName2 + " (v2 VARCHAR) AS SELECT 
* FROM "
+                            + fullTableName + " WHERE k = 6";
+            conn.createStatement().execute(ddl);
+        }
+    }
+
+    @Test
+    public void testConcurrentAddColumn() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
+            String fullViewName1 =
+                    SchemaUtil.getTableName(SCHEMA2,
+                        SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName());
+            String fullViewName2 = SchemaUtil.getTableName(SCHEMA3, 
generateUniqueName());
+            // create base table
+            String tableDdl =
+                    "CREATE TABLE " + fullTableName + "  (k INTEGER NOT NULL 
PRIMARY KEY, v1 DATE)"
+                            + tableDDLOptions;
+            conn.createStatement().execute(tableDdl);
+            // create a view
+            String ddl =
+                    "CREATE VIEW " + fullViewName1 + " (v2 VARCHAR) AS SELECT 
* FROM "
+                            + fullTableName + " WHERE k = 6";
+            conn.createStatement().execute(ddl);
+
+            latch1 = new CountDownLatch(1);
+            latch2 = new CountDownLatch(1);
+            ExecutorService executorService = Executors.newFixedThreadPool(1, 
new ThreadFactory() {
+                @Override
+                public Thread newThread(Runnable r) {
+                    Thread t = Executors.defaultThreadFactory().newThread(r);
+                    t.setDaemon(true);
+                    t.setPriority(Thread.MIN_PRIORITY);
+                    return t;
+                }
+            });
+
+            // add a column to the view in a separate thread (which will take
+            // some time to complete)
+            Future<Exception> future = executorService.submit(new 
AddColumnRunnable(fullViewName1));
+            // wait till the thread makes the rpc to create the view
+            boolean result = latch1.await(2, TimeUnit.MINUTES);
+            if (!result) {
+                fail("The create view rpc look too long");
+            }
+            tableDdl = "ALTER TABLE " + fullTableName + " ADD v3 INTEGER";
+            try {
+                // add the same column to the base table with a different type
+                conn.createStatement().execute(tableDdl);
+                fail("Creating a view while concurrently dropping the base 
table should fail");
+            } catch (ConcurrentTableMutationException e) {
+            }
+            latch2.countDown();
+
+            Exception e = future.get();
+            assertNull(e);
+
+            // add a the same column to the another view  to ensure that the 
cell used
+            // to prevent concurrent modifications was removed
+            ddl = "CREATE VIEW " + fullViewName2 + " (v2 VARCHAR) AS SELECT * 
FROM " 
+                    + fullTableName + " WHERE k = 6";
+            conn.createStatement().execute(ddl);
+            tableDdl = "ALTER VIEW " + fullViewName2 + " ADD v3 INTEGER";
+            conn.createStatement().execute(tableDdl);
+        }
+    }
+
+    private class CreateViewRunnable implements Callable<Exception> {
+        private final String fullTableName;
+        private final String fullViewName;
+
+        public CreateViewRunnable(String fullTableName, String fullViewName) {
+            this.fullTableName = fullTableName;
+            this.fullViewName = fullViewName;
+        }
+
         @Override
-        public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
-                MiniBatchOperationInProgress<Mutation> miniBatchOp) throws 
IOException {
-            if (shouldFail(c, miniBatchOp.getOperation(0))) {
-                // throwing anything other than instances of IOException result
-                // in this coprocessor being unloaded
-                // DoNotRetryIOException tells HBase not to retry this mutation
-                // multiple times
-                throw new DoNotRetryIOException();
+        public Exception call() throws Exception {
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            try (Connection conn = DriverManager.getConnection(getUrl(), 
props)) {
+                String ddl =
+                        "CREATE VIEW " + fullViewName + " (v2 VARCHAR) AS 
SELECT * FROM "
+                                + fullTableName + " WHERE k = 5";
+                conn.createStatement().execute(ddl);
+            } catch (SQLException e) {
+                return e;
             }
+            return null;
         }
+    }
 
-        private boolean 
shouldFail(ObserverContext<RegionCoprocessorEnvironment> c, Mutation m) {
-            TableName tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable();
-            return 
tableName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
-                    && (Bytes.equals(ROWKEY_TO_FAIL_BYTES, m.getRow()));
+    private class AddColumnRunnable implements Callable<Exception> {
+        private final String fullViewName;
+
+        public AddColumnRunnable(String fullViewName) {
+            this.fullViewName = fullViewName;
         }
 
+        @Override
+        public Exception call() throws Exception {
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            try (Connection conn = DriverManager.getConnection(getUrl(), 
props)) {
+                String ddl = "ALTER VIEW " + fullViewName + " ADD v3 CHAR(15)";
+                conn.createStatement().execute(ddl);
+            } catch (SQLException e) {
+                return e;
+            }
+            return null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index dab1048..7a4a481 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -331,6 +331,8 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final String SEQUENCE_TABLE_TYPE = SYSTEM_SEQUENCE_TABLE;
 
     public static final String SYNC_INDEX_CREATED_DATE = 
"SYNC_INDEX_CREATED_DATE";
+    public static final String SYSTEM_MUTEX_COLUMN_NAME = "MUTEX_VALUE";
+    public static final byte[] SYSTEM_MUTEX_COLUMN_NAME_BYTES = 
Bytes.toBytes(SYSTEM_MUTEX_COLUMN_NAME);
     public static final String SYSTEM_MUTEX_TABLE_NAME = "MUTEX";
     public static final String SYSTEM_MUTEX_NAME = 
SchemaUtil.getTableName(QueryConstants.SYSTEM_SCHEMA_NAME, 
SYSTEM_MUTEX_TABLE_NAME);
     public static final TableName SYSTEM_MUTEX_HBASE_TABLE_NAME = 
TableName.valueOf(SYSTEM_MUTEX_NAME);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 6f8cbc0..0820232 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -169,4 +169,18 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
     public QueryLoggerDisruptor getQueryDisruptor();
     
     public PhoenixTransactionClient 
initTransactionClient(TransactionFactory.Provider provider);
+    
+    /**
+     * Writes a cell to SYSTEM.MUTEX using checkAndPut to ensure only a single 
client can execute a
+     * particular task. The params are used to generate the rowkey.
+     * @return true if this client was able to successfully acquire the mutex
+     */
+    public boolean writeMutexCell(String tenantId, String schemaName, String 
tableName,
+            String columnName, String familyName) throws SQLException;
+
+    /**
+     * Deletes a cell that was written to SYSTEM.MUTEX. The params are used to 
generate the rowkey.
+     */
+    public void deleteMutexCell(String tenantId, String schemaName, String 
tableName,
+            String columnName, String familyName) throws SQLException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 4c7630d..8bbb379 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -64,6 +64,7 @@ import static 
org.apache.phoenix.util.UpgradeUtil.moveChildLinks;
 import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.lang.ref.WeakReference;
 import java.sql.PreparedStatement;
 import java.sql.ResultSetMetaData;
@@ -109,6 +110,7 @@ import org.apache.hadoop.hbase.NamespaceNotFoundException;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -326,9 +328,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     private final AtomicBoolean upgradeRequired = new AtomicBoolean(false);
     private final int maxConnectionsAllowed;
     private final boolean shouldThrottleNumConnections;
-    public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
-    public static final byte[] UPGRADE_MUTEX_LOCKED = 
"UPGRADE_MUTEX_LOCKED".getBytes();
-    public static final byte[] UPGRADE_MUTEX_UNLOCKED = 
"UPGRADE_MUTEX_UNLOCKED".getBytes();
+    public static final byte[] MUTEX_LOCKED = "MUTEX_LOCKED".getBytes();
 
     private static interface FeatureSupported {
         boolean isSupported(ConnectionQueryServices services);
@@ -2502,6 +2502,10 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     protected String getChildLinkDDL() {
         return 
setSystemDDLProperties(QueryConstants.CREATE_CHILD_LINK_METADATA);
     }
+    
+    protected String getMutexDDL() {
+        return setSystemDDLProperties(QueryConstants.CREATE_MUTEX_METADTA);
+    }
 
     private String setSystemDDLProperties(String ddl) {
         return String.format(ddl,
@@ -2673,13 +2677,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             columnDesc.setTimeToLive(TTL_FOR_MUTEX); // Let mutex expire after 
some time
             tableDesc.addFamily(columnDesc);
             admin.createTable(tableDesc);
-            try (HTableInterface sysMutexTable = 
getTable(mutexTableName.getName())) {
-                byte[] mutexRowKey = SchemaUtil.getTableKey(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
-                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
-                Put put = new Put(mutexRowKey);
-                
put.addColumn(PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES, 
UPGRADE_MUTEX, UPGRADE_MUTEX_UNLOCKED);
-                sysMutexTable.put(put);
-            }
         } catch (IOException e) {
             
if(!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), 
AccessDeniedException.class)) ||
                     
!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), 
org.apache.hadoop.hbase.TableNotFoundException.class))) {
@@ -2687,13 +2684,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             } else {
                 throw e;
             }
-        }catch(PhoenixIOException e){
-            if(e.getCause()!=null && e.getCause() instanceof 
AccessDeniedException)
-            {
-                //Ignore
-            }else{
-                throw e;
-            }
         }
     }
 
@@ -2723,13 +2713,10 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         try {
             metaConnection.createStatement().executeUpdate(getChildLinkDDL());
         } catch (TableAlreadyExistsException e) {}
-        // Catch the IOException to log the error message and then bubble it 
up for the client to retry.
         try {
-            createSysMutexTableIfNotExists(hbaseAdmin);
-        } catch (IOException exception) {
-            logger.error("Failed to created SYSMUTEX table. Upgrade or 
migration is not possible without it. Please retry.");
-            throw exception;
-        }
+            metaConnection.createStatement().executeUpdate(getMutexDDL());
+        } catch (TableAlreadyExistsException e) {}
+        // Catch the IOException to log the error message and then bubble it 
up for the client to retry.
     }
 
     /**
@@ -3022,8 +3009,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         String sysCatalogTableName = null;
         SQLException toThrow = null;
         boolean acquiredMutexLock = false;
-        byte[] mutexRowKey = SchemaUtil.getTableKey(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
-                PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
         boolean snapshotCreated = false;
         try {
             if (!isUpgradeRequired()) {
@@ -3054,7 +3039,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 sysCatalogTableName = 
SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, 
this.getProps()).getNameAsString();
                 if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, 
ConnectionQueryServicesImpl.this.getProps())) {
                     // Try acquiring a lock in SYSMUTEX table before migrating 
the tables since it involves disabling the table.
-                    if (acquiredMutexLock = 
acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP, 
mutexRowKey)) {
+                    if (acquiredMutexLock = 
acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP)) {
                         logger.debug("Acquired lock in SYSMUTEX table for 
migrating SYSTEM tables to SYSTEM namespace "
                           + "and/or upgrading " + sysCatalogTableName);
                     }
@@ -3073,7 +3058,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     // Try acquiring a lock in SYSMUTEX table before upgrading 
SYSCAT. If we cannot acquire the lock,
                     // it means some old client is either migrating SYSTEM 
tables or trying to upgrade the schema of
                     // SYSCAT table and hence it should not be interrupted
-                    if (acquiredMutexLock = 
acquireUpgradeMutex(currentServerSideTableTimeStamp, mutexRowKey)) {
+                    if (acquiredMutexLock = 
acquireUpgradeMutex(currentServerSideTableTimeStamp)) {
                         logger.debug("Acquired lock in SYSMUTEX table for 
upgrading " + sysCatalogTableName);
                         snapshotName = 
getSysCatalogSnapshotName(currentServerSideTableTimeStamp);
                         createSnapshot(snapshotName, sysCatalogTableName);
@@ -3173,6 +3158,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             try {
                 
metaConnection.createStatement().executeUpdate(getChildLinkDDL());
             } catch (NewerTableAlreadyExistsException e) {} catch 
(TableAlreadyExistsException e) {}
+            try {
+                metaConnection.createStatement().executeUpdate(getMutexDDL());
+            } catch (NewerTableAlreadyExistsException e) {} catch 
(TableAlreadyExistsException e) {}
 
             // In case namespace mapping is enabled and system table to system 
namespace mapping is also enabled,
             // create an entry for the SYSTEM namespace in the SYSCAT table, 
so that GRANT/REVOKE commands can work
@@ -3216,7 +3204,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 } finally {
                     if (acquiredMutexLock) {
                         try {
-                            releaseUpgradeMutex(mutexRowKey);
+                            releaseUpgradeMutex();
                         } catch (IOException e) {
                             logger.warn("Release of upgrade mutex failed ", e);
                         }
@@ -3410,17 +3398,10 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             // No tables exist matching "SYSTEM\..*", they are all already in 
"SYSTEM:.*"
             if (tableNames.size() == 0) { return; }
             // Try to move any remaining tables matching "SYSTEM\..*" into 
"SYSTEM:"
-            if (tableNames.size() > 5) {
-                logger.warn("Expected 5 system tables but found " + 
tableNames.size() + ":" + tableNames);
+            if (tableNames.size() > 7) {
+                logger.warn("Expected 7 system tables but found " + 
tableNames.size() + ":" + tableNames);
             }
 
-            // Handle the upgrade of SYSMUTEX table separately since it 
doesn't have any entries in SYSCAT
-            logger.info("Migrating SYSTEM.MUTEX table to SYSTEM namespace.");
-            String sysMutexSrcTableName = 
PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME;
-            String sysMutexDestTableName = 
SchemaUtil.getPhysicalName(sysMutexSrcTableName.getBytes(), 
this.getProps()).getNameAsString();
-            UpgradeUtil.mapTableToNamespace(admin, sysMutexSrcTableName, 
sysMutexDestTableName, PTableType.SYSTEM);
-            
tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME);
-
             byte[] mappedSystemTable = SchemaUtil
                     
.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, 
this.getProps()).getName();
             metatable = getTable(mappedSystemTable);
@@ -3464,64 +3445,95 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
      * @throws SQLException
      */
     @VisibleForTesting
-    public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, 
byte[] rowToLock) throws IOException,
+    public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp)
+            throws IOException,
             SQLException {
         Preconditions.checkArgument(currentServerSideTableTimestamp < 
MIN_SYSTEM_TABLE_TIMESTAMP);
-
         byte[] sysMutexPhysicalTableNameBytes = 
getSysMutexPhysicalTableNameBytes();
         if(sysMutexPhysicalTableNameBytes == null) {
             throw new 
UpgradeInProgressException(getVersion(currentServerSideTableTimestamp),
                     getVersion(MIN_SYSTEM_TABLE_TIMESTAMP));
         }
+        if (!writeMutexCell(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+            PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE, null, null)) {
+            throw new 
UpgradeInProgressException(getVersion(currentServerSideTableTimestamp),
+                    getVersion(MIN_SYSTEM_TABLE_TIMESTAMP));
+        }
+        return true;
+    }
 
-        try (HTableInterface sysMutexTable = 
getTable(sysMutexPhysicalTableNameBytes)) {
-            byte[] family = 
PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
-            byte[] qualifier = UPGRADE_MUTEX;
-            byte[] oldValue = UPGRADE_MUTEX_UNLOCKED;
-            byte[] newValue = UPGRADE_MUTEX_LOCKED;
-            Put put = new Put(rowToLock);
-            put.addColumn(family, qualifier, newValue);
-            boolean acquired =  sysMutexTable.checkAndPut(rowToLock, family, 
qualifier, oldValue, put);
-            if (!acquired) {
-                /*
-                 * Because of TTL on the SYSTEM_MUTEX_FAMILY, it is very much 
possible that the cell
-                 * has gone away. So we need to retry with an old value of 
null. Note there is a small
-                 * race condition here that between the two checkAndPut calls, 
it is possible that another
-                 * request would have set the value back to 
UPGRADE_MUTEX_UNLOCKED. In that scenario this
-                 * following checkAndPut would still return false even though 
the lock was available.
-                 */
-                acquired =  sysMutexTable.checkAndPut(rowToLock, family, 
qualifier, null, put);
-                if (!acquired) {
-                    throw new 
UpgradeInProgressException(getVersion(currentServerSideTableTimestamp),
-                        getVersion(MIN_SYSTEM_TABLE_TIMESTAMP));
+    @Override
+    public boolean writeMutexCell(String tenantId, String schemaName, String 
tableName,
+            String columnName, String familyName) throws SQLException {
+        try {
+            byte[] rowKey =
+                    columnName != null
+                            ? SchemaUtil.getColumnKey(tenantId, schemaName, 
tableName, columnName,
+                                familyName)
+                            : SchemaUtil.getTableKey(tenantId, schemaName, 
tableName);
+            // at this point the system mutex table should have been created or
+            // an exception thrown
+            byte[] sysMutexPhysicalTableNameBytes = 
getSysMutexPhysicalTableNameBytes();
+            try (HTableInterface sysMutexTable = 
getTable(sysMutexPhysicalTableNameBytes)) {
+                byte[] family = 
PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
+                byte[] qualifier = 
PhoenixDatabaseMetaData.SYSTEM_MUTEX_COLUMN_NAME_BYTES;
+                byte[] value = MUTEX_LOCKED;
+                Put put = new Put(rowKey);
+                put.addColumn(family, qualifier, value);
+                boolean checkAndPut =
+                        sysMutexTable.checkAndPut(rowKey, family, qualifier, 
null, put);
+                String processName = 
ManagementFactory.getRuntimeMXBean().getName();
+                String msg =
+                        " tenantId : " + tenantId + " schemaName : " + 
schemaName + " tableName : "
+                                + tableName + " columnName : " + columnName + 
" familyName : "
+                                + familyName;
+                if (!checkAndPut) {
+                    logger.error(processName + " failed to acquire mutex for 
"+ msg);
+                }
+                else {
+                    logger.debug(processName + " acquired mutex for "+ msg);
                 }
+                return checkAndPut;
             }
-            return true;
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
         }
     }
 
     @VisibleForTesting
-    public boolean releaseUpgradeMutex(byte[] mutexRowKey) throws IOException, 
SQLException {
-        boolean released = false;
-
-        byte[] sysMutexPhysicalTableNameBytes = 
getSysMutexPhysicalTableNameBytes();
-        if(sysMutexPhysicalTableNameBytes == null) {
-            // We shouldn't never be really in this situation where neither 
SYSMUTEX or SYS:MUTEX exists
-            return true;
-        }
+    public void releaseUpgradeMutex() throws IOException, SQLException {
+        deleteMutexCell(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+            PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE, null, null);
+    }
 
-        try (HTableInterface sysMutexTable = 
getTable(sysMutexPhysicalTableNameBytes)) {
-            byte[] family = 
PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
-            byte[] qualifier = UPGRADE_MUTEX;
-            byte[] expectedValue = UPGRADE_MUTEX_LOCKED;
-            byte[] newValue = UPGRADE_MUTEX_UNLOCKED;
-            Put put = new Put(mutexRowKey);
-            put.addColumn(family, qualifier, newValue);
-            released = sysMutexTable.checkAndPut(mutexRowKey, family, 
qualifier, expectedValue, put);
-        } catch (Exception e) {
-            logger.warn("Release of upgrade mutex failed", e);
+    @Override
+    public void deleteMutexCell(String tenantId, String schemaName, String 
tableName,
+            String columnName, String familyName) throws SQLException {
+        try {
+            byte[] rowKey =
+                    columnName != null
+                            ? SchemaUtil.getColumnKey(tenantId, schemaName, 
tableName, columnName,
+                                familyName)
+                            : SchemaUtil.getTableKey(tenantId, schemaName, 
tableName);
+            // at this point the system mutex table should have been created or
+            // an exception thrown
+            byte[] sysMutexPhysicalTableNameBytes = 
getSysMutexPhysicalTableNameBytes();
+            try (HTableInterface sysMutexTable = 
getTable(sysMutexPhysicalTableNameBytes)) {
+                byte[] family = 
PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
+                byte[] qualifier = 
PhoenixDatabaseMetaData.SYSTEM_MUTEX_COLUMN_NAME_BYTES;
+                Delete delete = new Delete(rowKey);
+                delete.addColumn(family, qualifier);
+                sysMutexTable.delete(delete);
+                String processName = 
ManagementFactory.getRuntimeMXBean().getName();
+                String msg =
+                        " tenantId : " + tenantId + " schemaName : " + 
schemaName + " tableName : "
+                                + tableName + " columnName : " + columnName + 
" familyName : "
+                                + familyName;
+                logger.debug(processName + " released mutex for "+ msg);
+            }
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
         }
-        return released;
     }
 
     private byte[] getSysMutexPhysicalTableNameBytes() throws IOException, 
SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 5a46214..f088172 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -180,6 +180,10 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
     protected String getChildLinkDDL() {
         return 
setSystemDDLProperties(QueryConstants.CREATE_CHILD_LINK_METADATA);
     }
+    
+    protected String getMutexDDL() {
+        return setSystemDDLProperties(QueryConstants.CREATE_MUTEX_METADTA);
+    }
 
     private String setSystemDDLProperties(String ddl) {
         return String.format(ddl,
@@ -379,6 +383,11 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
                             .executeUpdate(getChildLinkDDL());
                 } catch (NewerTableAlreadyExistsException ignore) {
                 }
+                try {
+                    metaConnection.createStatement()
+                            .executeUpdate(getMutexDDL());
+                } catch (NewerTableAlreadyExistsException ignore) {
+                }
             } catch (SQLException e) {
                 sqlE = e;
             } finally {
@@ -730,4 +739,15 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
     public PhoenixTransactionClient initTransactionClient(Provider provider) {
         return null; // Client is not necessary
     }
+
+    @Override
+    public boolean writeMutexCell(String tenantId, String schemaName, String 
tableName,
+            String columnName, String familyName) throws SQLException {
+        return true;
+    }
+
+    @Override
+    public void deleteMutexCell(String tenantId, String schemaName, String 
tableName,
+            String columnName, String familyName) throws SQLException {
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index b3e2cb2..147e873 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -370,4 +370,15 @@ public class DelegateConnectionQueryServices extends 
DelegateQueryServices imple
     public PhoenixTransactionClient initTransactionClient(Provider provider) {
         return getDelegate().initTransactionClient(provider);
     }
+
+    @Override
+    public boolean writeMutexCell(String tenantId, String schemaName, String 
tableName,
+            String columnName, String familyName) throws SQLException {
+        return true;
+    }
+
+    @Override
+    public void deleteMutexCell(String tenantId, String schemaName, String 
tableName,
+            String columnName, String familyName) throws SQLException {
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 8d8d47f..32fedc8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -355,5 +355,19 @@ public interface QueryConstants {
                        + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" + 
HConstants.VERSIONS + "=%s,\n"
                        + HColumnDescriptor.KEEP_DELETED_CELLS + "=%s,\n" + 
PhoenixDatabaseMetaData.TRANSACTIONAL + "="
                        + Boolean.FALSE;
+       
+        public static final String CREATE_MUTEX_METADTA =
+                   "CREATE IMMUTABLE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_MUTEX_TABLE_NAME + "\"(\n" +
+                   // Pk columns
+                   TENANT_ID + " VARCHAR NULL," +
+                   TABLE_SCHEM + " VARCHAR NULL," +
+                   TABLE_NAME + " VARCHAR NOT NULL," +
+                   COLUMN_NAME + " VARCHAR NULL," + // null for table row
+                   COLUMN_FAMILY + " VARCHAR NULL " + // using for CF to 
uniqueness for columns
+                   "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + 
TENANT_ID + ","
+                   + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," 
+ COLUMN_FAMILY + "))\n" +
+                   HConstants.VERSIONS + "=%s,\n" +
+                   HColumnDescriptor.KEEP_DELETED_CELLS + "=%s,\n" +
+                   PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
     
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0d7daec/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 625d03f..c714eab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1874,6 +1874,24 @@ public class MetaDataClient {
         }
         return false;
     }
+    
+    /**
+     * If we are creating a view we write a cell to the SYSTEM.MUTEX table 
with the rowkey of the
+     * parent table to prevent concurrent modifications
+     */
+    private boolean writeCell(String tenantId, String schemaName, String 
tableName, String columnName)
+            throws SQLException {
+        return connection.getQueryServices().writeMutexCell(tenantId, 
schemaName, tableName, columnName, null);
+    }
+
+    /**
+     * Remove the cell that was written to to the SYSTEM.MUTEX table with the 
rowkey of the
+     * parent table to prevent concurrent modifications
+     */
+    private void deleteCell(String tenantId, String schemaName, String 
tableName, String columnName)
+            throws SQLException {
+        connection.getQueryServices().deleteMutexCell(tenantId, schemaName, 
tableName, columnName, null);
+    }
 
     private PTable createTableInternal(CreateTableStatement statement, 
byte[][] splits,
             final PTable parent, String viewStatement, ViewType viewType,
@@ -1884,6 +1902,7 @@ public class MetaDataClient {
         final PTableType tableType = statement.getTableType();
         boolean wasAutoCommit = connection.getAutoCommit();
         connection.rollback();
+        boolean acquiredMutex = false;
         try {
             connection.setAutoCommit(false);
             List<Mutation> tableMetaData = 
Lists.newArrayListWithExpectedSize(statement.getColumnDefs().size() + 3);
@@ -1913,6 +1932,21 @@ public class MetaDataClient {
             boolean isLocalIndex = indexType == IndexType.LOCAL;
             QualifierEncodingScheme encodingScheme = NON_ENCODED_QUALIFIERS;
             ImmutableStorageScheme immutableStorageScheme = 
ONE_CELL_PER_COLUMN;
+            
+            if (tableType == PTableType.VIEW) {
+                PName physicalName = parent.getPhysicalName();
+                String physicalSchemaName =
+                        
SchemaUtil.getSchemaNameFromFullName(physicalName.getString());
+                String physicalTableName =
+                        
SchemaUtil.getTableNameFromFullName(physicalName.getString());
+                // acquire the mutex using the global physical table name to
+                // prevent creating views while concurrently dropping the base
+                // table
+                acquiredMutex = writeCell(null, physicalSchemaName, 
physicalTableName, null);
+                if (!acquiredMutex) {
+                    throw new 
ConcurrentTableMutationException(physicalSchemaName, physicalTableName);
+                }
+            }
             if (parent != null && tableType == PTableType.INDEX) {
                 timestamp = TransactionUtil.getTableTimestamp(connection, 
transactionProvider != null, transactionProvider);
                 storeNulls = parent.getStoreNulls();
@@ -2833,6 +2867,16 @@ public class MetaDataClient {
             }
         } finally {
             connection.setAutoCommit(wasAutoCommit);
+            if (acquiredMutex && tableType == PTableType.VIEW) {
+                PName physicalName = parent.getPhysicalName();
+                String physicalSchemaName =
+                        
SchemaUtil.getSchemaNameFromFullName(physicalName.getString());
+                String physicalTableName =
+                        
SchemaUtil.getTableNameFromFullName(physicalName.getString());
+                // releasing mutex on the table (required to prevent creating 
views while concurrently
+                // dropping the base table)
+                deleteCell(null, physicalSchemaName, physicalTableName, null);
+            }
         }
     }
 
@@ -2942,9 +2986,11 @@ public class MetaDataClient {
             boolean ifExists, boolean cascade, boolean 
skipAddingParentColumns) throws SQLException {
         connection.rollback();
         boolean wasAutoCommit = connection.getAutoCommit();
+        PName tenantId = connection.getTenantId();
+        String tenantIdStr = tenantId == null ? null : tenantId.getString();
+        boolean acquiredMutex = false;
+        String physicalTableName = SchemaUtil.getTableName(schemaName, 
tableName);
         try {
-            PName tenantId = connection.getTenantId();
-            String tenantIdStr = tenantId == null ? null : 
tenantId.getString();
             byte[] key = SchemaUtil.getTableKey(tenantIdStr, schemaName, 
tableName);
             Long scn = connection.getSCN();
             long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : 
scn;
@@ -2957,6 +3003,14 @@ public class MetaDataClient {
                 Delete linkDelete = new Delete(linkKey, clientTimeStamp);
                 tableMetaData.add(linkDelete);
             }
+            if (tableType == PTableType.TABLE) {
+                // acquire a mutex on the table to prevent creating views 
while concurrently
+                // dropping the base table
+                acquiredMutex = writeCell(null, schemaName, tableName, null);
+                if (!acquiredMutex) {
+                    throw new ConcurrentTableMutationException(schemaName, 
schemaName);
+                }
+            }
             MetaDataMutationResult result = 
connection.getQueryServices().dropTable(tableMetaData, tableType, cascade, 
skipAddingParentColumns);
             MutationCode code = result.getMutationCode();
             PTable table = result.getTable();
@@ -3034,6 +3088,11 @@ public class MetaDataClient {
             return new MutationState(0, 0, connection);
         } finally {
             connection.setAutoCommit(wasAutoCommit);
+            // releasing mutex on the table (required to prevent creating 
views while concurrently
+            // dropping the base table)
+            if (acquiredMutex && tableType == PTableType.TABLE) {
+                deleteCell(null, schemaName, tableName, null);
+            }
         }
     }
 
@@ -3252,11 +3311,18 @@ public class MetaDataClient {
                     throws SQLException {
         connection.rollback();
         boolean wasAutoCommit = connection.getAutoCommit();
+               List<PColumn> columns = 
Lists.newArrayListWithExpectedSize(origColumnDefs != null ? 
origColumnDefs.size() : 0);
+        PName tenantId = connection.getTenantId();
+        String schemaName = table.getSchemaName().getString();
+        String tableName = table.getTableName().getString();
+        PName physicalName = table.getPhysicalName();
+        String physicalSchemaName =
+                SchemaUtil.getSchemaNameFromFullName(physicalName.getString());
+        String physicalTableName =
+                SchemaUtil.getTableNameFromFullName(physicalName.getString());
+        Set<String> acquiredColumnMutexSet = 
Sets.newHashSetWithExpectedSize(3);
         try {
             connection.setAutoCommit(false);
-            PName tenantId = connection.getTenantId();
-            String schemaName = table.getSchemaName().getString();
-            String tableName = table.getTableName().getString();
 
             List<ColumnDef> columnDefs = null;
             if (table.isAppendOnlySchema()) {
@@ -3337,7 +3403,6 @@ public class MetaDataClient {
                 boolean willBeTxnl = metaProperties.getNonTxToTx();
                 Long timeStamp = TransactionUtil.getTableTimestamp(connection, 
table.isTransactional() || willBeTxnl, table.isTransactional() ? 
table.getTransactionProvider() : 
metaPropertiesEvaluated.getTransactionProvider());
                 int numPkColumnsAdded = 0;
-                List<PColumn> columns = 
Lists.newArrayListWithExpectedSize(numCols);
                 Set<String> colFamiliesForPColumnsToBeAdded = new 
LinkedHashSet<>();
                 Set<String> families = new LinkedHashSet<>();
                 PTable tableForCQCounters = tableType == PTableType.VIEW ? 
PhoenixRuntime.getTable(connection, table.getPhysicalName().getString()) : 
table;
@@ -3534,6 +3599,18 @@ public class MetaDataClient {
                     }
                 }
 
+                boolean acquiredMutex = true;
+                for (PColumn pColumn : columns) {
+                    // acquire the mutex using the global physical table name 
to
+                    // prevent creating the same column on a table or view with
+                    // a conflicting type etc
+                    acquiredMutex = writeCell(null, physicalSchemaName, 
physicalTableName,
+                        pColumn.getName().getString());
+                    if (!acquiredMutex) {
+                        throw new 
ConcurrentTableMutationException(physicalSchemaName, physicalTableName);
+                    }
+                    acquiredColumnMutexSet.add(pColumn.getName().getString());
+                }
                 MetaDataMutationResult result = 
connection.getQueryServices().addColumn(tableMetaData, table, properties, 
colFamiliesForPColumnsToBeAdded, columns);
                 try {
                     MutationCode code = processMutationResult(schemaName, 
tableName, result);
@@ -3604,6 +3681,12 @@ public class MetaDataClient {
             }
         } finally {
             connection.setAutoCommit(wasAutoCommit);
+            if (!acquiredColumnMutexSet.isEmpty()) {
+                for (String columnName : acquiredColumnMutexSet) {
+                    // release the mutex (used to prevent concurrent 
conflicting add column changes)
+                    deleteCell(null, physicalSchemaName, physicalTableName, 
columnName);
+                }
+            }
         }
     }
 

Reply via email to