PHOENIX-4855 Continue to write base table column metadata when creating a view 
in order to support rollback


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a6c1aa45
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a6c1aa45
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a6c1aa45

Branch: refs/heads/4.x-cdh5.15
Commit: a6c1aa4531ab57e21174b9c208c3b40e4e845bd5
Parents: adbd986
Author: Thomas D'Silva <tdsi...@apache.org>
Authored: Mon Aug 20 18:42:56 2018 +0100
Committer: Pedro Boado <pbo...@apache.org>
Committed: Wed Oct 17 22:49:38 2018 +0100

----------------------------------------------------------------------
 .../AlterMultiTenantTableWithViewsIT.java       |  25 ++-
 .../phoenix/end2end/AlterTableWithViewsIT.java  | 196 ++++++++++-------
 .../java/org/apache/phoenix/end2end/ViewIT.java |  14 +-
 .../coprocessor/MetaDataEndpointImpl.java       | 208 +++++++++++--------
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   | 158 +++++++++++---
 .../apache/phoenix/schema/DelegateTable.java    |   5 +
 .../apache/phoenix/schema/MetaDataClient.java   |  38 ++--
 .../org/apache/phoenix/schema/PColumnImpl.java  |   3 +-
 .../java/org/apache/phoenix/schema/PTable.java  |   6 +
 .../org/apache/phoenix/schema/PTableImpl.java   |  38 ++--
 10 files changed, 461 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6c1aa45/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterMultiTenantTableWithViewsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterMultiTenantTableWithViewsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterMultiTenantTableWithViewsIT.java
index 669b6f6..d5e1af2 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterMultiTenantTableWithViewsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterMultiTenantTableWithViewsIT.java
@@ -498,20 +498,24 @@ public class AlterMultiTenantTableWithViewsIT extends 
SplitSystemCatalogIT {
         String tenant = TENANT1;
         try (Connection conn = DriverManager.getConnection(getUrl());
                 Connection tenant1Conn = getTenantConnection(tenant)) {
-            String baseTableDDL = "CREATE TABLE " + baseTable + " (TENANT_ID 
VARCHAR NOT NULL, PK1 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR, V3 VARCHAR 
CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK1)) MULTI_TENANT = true ";
+            String baseTableDDL =
+                    "CREATE TABLE " + baseTable
+                            + " (TENANT_ID VARCHAR NOT NULL, PK1 VARCHAR NOT 
NULL, V1 VARCHAR, "
+                            + "V2 VARCHAR, V3 VARCHAR CONSTRAINT NAME_PK 
PRIMARY KEY(TENANT_ID, PK1))"
+                            + " MULTI_TENANT = true, SALT_BUCKETS = 4";
             conn.createStatement().execute(baseTableDDL);
 
             String view1DDL = "CREATE VIEW " + view1 + " ( VIEW_COL1 
DECIMAL(10,2), VIEW_COL2 CHAR(256)) AS SELECT * FROM " + baseTable;
             tenant1Conn.createStatement().execute(view1DDL);
 
-            assertTableDefinition(conn, baseTable, PTableType.TABLE, null, 1, 
5, BASE_TABLE_BASE_COLUMN_COUNT, "TENANT_ID", "PK1", "V1", "V2", "V3");
-            assertTableDefinition(tenant1Conn, view1, PTableType.VIEW, 
baseTable, 0, 7, 5,  "PK1", "V1", "V2", "V3", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(conn, baseTable, PTableType.TABLE, null, 1, 
6, BASE_TABLE_BASE_COLUMN_COUNT, "TENANT_ID", "PK1", "V1", "V2", "V3");
+            assertTableDefinition(tenant1Conn, view1, PTableType.VIEW, 
baseTable, 0, 8, 6,  "PK1", "V1", "V2", "V3", "VIEW_COL1", "VIEW_COL2");
 
             String alterBaseTable = "ALTER TABLE " + baseTable + " ADD KV 
VARCHAR, PK2 VARCHAR PRIMARY KEY";
             conn.createStatement().execute(alterBaseTable);
 
             assertTableDefinition(conn, baseTable, PTableType.TABLE, null, 2, 
7, BASE_TABLE_BASE_COLUMN_COUNT, "TENANT_ID", "PK1", "V1", "V2", "V3", "KV", 
"PK2");
-            assertTableDefinition(tenant1Conn, view1, PTableType.VIEW, 
baseTable, 0, 7, 5,  "PK1", "V1", "V2", "V3", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(tenant1Conn, view1, PTableType.VIEW, 
baseTable, 0, 8, 6,  "PK1", "V1", "V2", "V3", "KV", "PK2", "VIEW_COL1", 
"VIEW_COL2");
 
             // verify that the both columns were added to view1
             tenant1Conn.createStatement().execute("SELECT KV from " + view1);
@@ -526,21 +530,24 @@ public class AlterMultiTenantTableWithViewsIT extends 
SplitSystemCatalogIT {
         String tenant = TENANT1;
         try (Connection conn = DriverManager.getConnection(getUrl());
                 Connection tenant1Conn = getTenantConnection(tenant)) {
-            String baseTableDDL = "CREATE TABLE " + baseTable + " (TENANT_ID 
VARCHAR NOT NULL, PK1 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR, V3 VARCHAR 
CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK1)) MULTI_TENANT = true ";
+            String baseTableDDL =
+                    "CREATE TABLE " + baseTable
+                            + " (TENANT_ID VARCHAR NOT NULL, PK1 VARCHAR NOT 
NULL, V1 VARCHAR, V2 VARCHAR,"
+                            + " V3 VARCHAR CONSTRAINT NAME_PK PRIMARY 
KEY(TENANT_ID, PK1)) "
+                            + "MULTI_TENANT = true , SALT_BUCKETS = 4";
             conn.createStatement().execute(baseTableDDL);
 
             String view1DDL = "CREATE VIEW " + view1 + " ( VIEW_COL1 
DECIMAL(10,2), VIEW_COL2 CHAR(256)) AS SELECT * FROM " + baseTable;
             tenant1Conn.createStatement().execute(view1DDL);
 
-            assertTableDefinition(conn, baseTable, PTableType.TABLE, null, 1, 
5, BASE_TABLE_BASE_COLUMN_COUNT, "TENANT_ID", "PK1", "V1", "V2", "V3");
-            assertTableDefinition(tenant1Conn, view1, PTableType.VIEW, 
baseTable, 0, 7, 5, "PK1", "V1", "V2", "V3", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(conn, baseTable, PTableType.TABLE, null, 1, 
6, BASE_TABLE_BASE_COLUMN_COUNT, "TENANT_ID", "PK1", "V1", "V2", "V3");
+            assertTableDefinition(tenant1Conn, view1, PTableType.VIEW, 
baseTable, 0, 8, 6, "PK1", "V1", "V2", "V3", "VIEW_COL1", "VIEW_COL2");
 
             String alterBaseTable = "ALTER TABLE " + baseTable + " DROP COLUMN 
V2";
             conn.createStatement().execute(alterBaseTable);
 
             assertTableDefinition(conn, baseTable, PTableType.TABLE, null, 2, 
4, BASE_TABLE_BASE_COLUMN_COUNT, "TENANT_ID", "PK1", "V1", "V3");
-            // column adds and drops are no longer propagated to child views, 
when the parent view is resolved the dropped column is excluded
-            assertTableDefinition(tenant1Conn, view1, PTableType.VIEW, 
baseTable, 0, 7, 5, "PK1", "V1",  "V2", "V3", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(tenant1Conn, view1, PTableType.VIEW, 
baseTable, 0, 8, 6, "PK1", "V1",  "V3", "VIEW_COL1", "VIEW_COL2");
 
             // verify that the dropped columns aren't visible
             try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6c1aa45/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
index e97a40d..b1f0fce 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
@@ -73,19 +73,22 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
 
     private final boolean isMultiTenant;
     private final boolean columnEncoded;
+    private final boolean salted;
     private final String TENANT_SPECIFIC_URL1 = getUrl() + ';' + 
TENANT_ID_ATTRIB + "=" + TENANT1;
     private final String TENANT_SPECIFIC_URL2 = getUrl() + ';' + 
TENANT_ID_ATTRIB + "=" + TENANT2;
     
-    public AlterTableWithViewsIT(boolean isMultiTenant, boolean columnEncoded) 
{
-        this.isMultiTenant = isMultiTenant;
+    public AlterTableWithViewsIT(boolean columnEncoded, boolean isMultiTenant, 
boolean salted) {
         this.columnEncoded = columnEncoded;
+        this.isMultiTenant = isMultiTenant;
+        this.salted = salted;
     }
     
-    @Parameters(name="AlterTableWithViewsIT_multiTenant={0}, 
columnEncoded={1}") // name is used by failsafe as file name in reports
+    // name is used by failsafe as file name in reports
+    @Parameters(name = "AlterTableWithViewsIT_columnEncoded={0}, 
multiTenant={1}, salted={2}")
     public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] { 
-                { false, false }, { false, true },
-                { true, false }, { true, true } });
+        return Arrays.asList(new Boolean[][] { { false, false, false }, { 
false, false, true },
+                { false, true, false }, { false, true, true }, { true, false, 
false },
+                { true, false, true }, { true, true, false }, { true, true, 
true } });
     }
     
     // transform PColumn to String
@@ -112,6 +115,11 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
                 optionsBuilder.append(",");
             optionsBuilder.append("MULTI_TENANT=true");
         }
+        if (salted) {
+            if (optionsBuilder.length()!=0)
+                optionsBuilder.append(",");
+            optionsBuilder.append("SALT_BUCKETS=4");
+        }
         return String.format(format, isMultiTenant ? "TENANT_ID VARCHAR NOT 
NULL, " : "",
             isMultiTenant ? "TENANT_ID, " : "", optionsBuilder.toString());
     }
@@ -130,16 +138,17 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
                             + " CONSTRAINT NAME_PK PRIMARY KEY (%s ID, COL1, 
COL2)"
                             + " ) %s";
             conn.createStatement().execute(generateDDL(ddlFormat));
-            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 0, 
3, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", "COL2");
+            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 0, 
3, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, true, "ID", "COL1", "COL2");
             
             viewConn.createStatement().execute("CREATE VIEW " + viewOfTable + 
" ( VIEW_COL1 DECIMAL(10,2), VIEW_COL2 VARCHAR ) AS SELECT * FROM " + 
tableName);
-            assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 5, 3, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 5, 3, true, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
             
             // adding a new pk column and a new regular column
             conn.createStatement().execute("ALTER TABLE " + tableName + " ADD 
COL3 varchar(10) PRIMARY KEY, COL4 integer");
-            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 
columnEncoded ? 2 : 1, 5, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", 
"COL1", "COL2", "COL3", "COL4");
-            // add/drop column to a base table are no longer propagated to 
child views
-            assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 5, 3, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 
columnEncoded ? 2 : 1, 5, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, false, 
"ID", "COL1", "COL2", "COL3", "COL4");
+            // TODO PHOENIX-4766 add/drop column to a base table are no longer 
propagated to child views
+            // assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 5, 3, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 5, 3, true, "ID", "COL1", "COL2", "COL3", "COL4", "VIEW_COL1", 
"VIEW_COL2");
         } 
     }
     
@@ -234,22 +243,21 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
                             + " CONSTRAINT NAME_PK PRIMARY KEY (%s ID, COL1, 
COL2)" + " ) %s";
             conn.createStatement().execute(generateDDL(ddlFormat));
             assertTableDefinition(conn, tableName, PTableType.TABLE, null, 0, 
6,
-                QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", 
"COL2", "COL3", "COL4",
-                "COL5");
+                QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, true, "ID", 
"COL1", "COL2", "COL3",
+                "COL4", "COL5");
 
             viewConn.createStatement()
                     .execute(
                         "CREATE VIEW " + viewOfTable + " ( VIEW_COL1 
DECIMAL(10,2), VIEW_COL2 VARCHAR ) AS SELECT * FROM " + tableName);
             assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 8, 6,
-                "ID", "COL1", "COL2", "COL3", "COL4", "COL5", "VIEW_COL1", 
"VIEW_COL2");
+                true, "ID", "COL1", "COL2", "COL3", "COL4", "COL5", 
"VIEW_COL1", "VIEW_COL2");
 
             // drop two columns from the base table
             conn.createStatement().execute("ALTER TABLE " + tableName + " DROP 
COLUMN COL3, COL5");
             assertTableDefinition(conn, tableName, PTableType.TABLE, null, 
columnEncoded ? 2 : 1, 4,
-                QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", 
"COL2", "COL4");
-            // the columns will still exist in the view metadata , but are 
excluded while combining parent table columns
+                QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, false, "ID", 
"COL1", "COL2", "COL4");
             assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 8, 6,
-                "ID", "COL1", "COL2", "COL3", "COL4", "COL5", "VIEW_COL1", 
"VIEW_COL2");
+                true, "ID", "COL1", "COL2", "COL4", "VIEW_COL1", "VIEW_COL2");
         }
     }
     
@@ -270,10 +278,10 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
                             + " CONSTRAINT NAME_PK PRIMARY KEY (%s ID, COL1, 
COL2)"
                             + " ) %s";
             conn.createStatement().execute(generateDDL(ddlFormat));
-            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 0, 
4, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", "COL2", "COL3");
+            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 0, 
4, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, true, "ID", "COL1", "COL2", 
"COL3");
             
             viewConn.createStatement().execute("CREATE VIEW " + viewOfTable + 
" ( VIEW_COL1 DECIMAL(10,2), VIEW_COL2 VARCHAR(256), VIEW_COL3 VARCHAR, 
VIEW_COL4 DECIMAL, VIEW_COL5 DECIMAL(10,2), VIEW_COL6 VARCHAR, CONSTRAINT pk 
PRIMARY KEY (VIEW_COL5, VIEW_COL6) ) AS SELECT * FROM " + tableName);
-            assertTableDefinition(viewConn,viewOfTable, PTableType.VIEW, 
tableName, 0, 10, 4, "ID", "COL1", "COL2", "COL3", "VIEW_COL1", "VIEW_COL2", 
"VIEW_COL3", "VIEW_COL4", "VIEW_COL5", "VIEW_COL6");
+            assertTableDefinition(viewConn,viewOfTable, PTableType.VIEW, 
tableName, 0, 10, 4, true, "ID", "COL1", "COL2", "COL3", "VIEW_COL1", 
"VIEW_COL2", "VIEW_COL3", "VIEW_COL4", "VIEW_COL5", "VIEW_COL6");
             
             // upsert single row into view
             String dml = "UPSERT INTO " + viewOfTable + " VALUES(?,?,?,?,?, ?, 
?, ?, ?, ?)";
@@ -328,8 +336,8 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
             }
             
             // validate that there were no columns added to the table or view, 
if its table is column encoded the sequence number changes when we increment 
the cq counter
-            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 
columnEncoded ? 1 : 0, 4, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", 
"COL1", "COL2", "COL3");
-            assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 10, 4, "ID", "COL1", "COL2", "COL3", "VIEW_COL1", "VIEW_COL2", 
"VIEW_COL3", "VIEW_COL4", "VIEW_COL5", "VIEW_COL6");
+            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 
columnEncoded ? 1 : 0, 4, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, true, 
"ID", "COL1", "COL2", "COL3");
+            assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 10, 4, true, "ID", "COL1", "COL2", "COL3", "VIEW_COL1", 
"VIEW_COL2", "VIEW_COL3", "VIEW_COL4", "VIEW_COL5", "VIEW_COL6");
             
             if (columnEncoded) {
                 try {
@@ -343,9 +351,9 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
             else {
                 // should succeed 
                 conn.createStatement().execute("ALTER TABLE " + tableName + " 
ADD VIEW_COL4 DECIMAL, VIEW_COL2 VARCHAR(256)");
-                assertTableDefinition(conn, tableName, PTableType.TABLE, null, 
columnEncoded ? 2 : 1, 6, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", 
"COL1", "COL2", "COL3", "VIEW_COL4", "VIEW_COL2");
+                assertTableDefinition(conn, tableName, PTableType.TABLE, null, 
columnEncoded ? 2 : 1, 6, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, false, 
"ID", "COL1", "COL2", "COL3", "VIEW_COL4", "VIEW_COL2");
                 // even though we added columns to the base table, the view 
metadata remains the same as the base table metadata changes are no longer 
propagated to the chid view
-                assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 10, 4, "ID", "COL1", "COL2", "COL3", "VIEW_COL1", "VIEW_COL2", 
"VIEW_COL3", "VIEW_COL4", "VIEW_COL5", "VIEW_COL6");
+                assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 10, 4, true, "ID", "COL1", "COL2", "COL3", "VIEW_COL4", 
"VIEW_COL2", "VIEW_COL1", "VIEW_COL3", "VIEW_COL5", "VIEW_COL6");
                 
                 // query table
                 ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
@@ -376,7 +384,7 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
                 // the base column count and ordinal positions of columns is 
updated in the ptable (at read time) 
                 PName tenantId = isMultiTenant ? PNameFactory.newName(TENANT1) 
: null;
                 PTable view = 
viewConn.unwrap(PhoenixConnection.class).getTable(new PTableKey(tenantId, 
viewOfTable));
-                assertEquals(isMultiTenant ? 5: 4, view.getBaseColumnCount());
+                assertBaseColumnCount(4, view.getBaseColumnCount());
                 assertColumnsMatch(view.getColumns(), "ID", "COL1", "COL2", 
"COL3", "VIEW_COL4", "VIEW_COL2", "VIEW_COL1", "VIEW_COL3", "VIEW_COL5", 
"VIEW_COL6");
             }
         } 
@@ -398,14 +406,10 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
                             + " CONSTRAINT NAME_PK PRIMARY KEY (%s ID, COL1, 
COL2)"
                             + " ) %s";
             conn.createStatement().execute(generateDDL(ddlFormat));
-            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 0, 
3, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", "COL2");
-            PTable table = PhoenixRuntime.getTableNoCache(conn, 
tableName.toUpperCase());
-            assertColumnsMatch(table.getColumns(), "ID", "COL1", "COL2");
+            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 0, 
3, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, true, "ID", "COL1", "COL2");
             
             viewConn.createStatement().execute("CREATE VIEW " + viewOfTable + 
" ( VIEW_COL1 DECIMAL(10,2), VIEW_COL2 VARCHAR(256) CONSTRAINT pk PRIMARY KEY 
(VIEW_COL1, VIEW_COL2)) AS SELECT * FROM " + tableName);
-            assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 5, 3, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
-            PTable view = PhoenixRuntime.getTableNoCache(viewConn, 
viewOfTable.toUpperCase());
-            assertColumnsMatch(view.getColumns(), "ID", "COL1", "COL2", 
"VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 5, 3, true, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
             
             // upsert single row into view
             String dml = "UPSERT INTO " + viewOfTable + " VALUES(?,?,?,?,?)";
@@ -474,9 +478,9 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
             
             // add the pk column of the view to the base table
             conn.createStatement().execute("ALTER TABLE " + tableName + " ADD 
VIEW_COL1 DECIMAL(10,2) PRIMARY KEY, VIEW_COL2 VARCHAR(256) PRIMARY KEY");
-            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 1, 
5, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", "COL2", 
"VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 1, 
5, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, false, "ID", "COL1", "COL2", 
"VIEW_COL1", "VIEW_COL2");
             // even though we added columns to the base table, the sequence 
number and base column count is not updated in the view metadata (in 
SYSTEM.CATALOG)
-            assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0, 5, 3, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
tableName, 0,  5, 3, true, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
             
             // query table
             ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
@@ -500,8 +504,9 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
             
             // the base column count is updated in the ptable
             PName tenantId = isMultiTenant ? PNameFactory.newName(TENANT1) : 
null;
+            PTable view = PhoenixRuntime.getTableNoCache(viewConn, 
viewOfTable.toUpperCase());
             view = viewConn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(tenantId, viewOfTable));
-            assertEquals(isMultiTenant ? 4 : 3, view.getBaseColumnCount());
+            assertBaseColumnCount(3, view.getBaseColumnCount());
         } 
     }
     
@@ -520,16 +525,16 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
                             + " CONSTRAINT NAME_PK PRIMARY KEY (%s ID, COL1, 
COL2)"
                             + " ) %s";
             conn.createStatement().execute(generateDDL(ddlFormat));
-            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 0, 
3, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", "COL2");
+            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 0, 
3, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, true, "ID", "COL1", "COL2");
             
             viewConn.createStatement().execute("CREATE VIEW " + viewOfTable1 + 
" ( VIEW_COL1 DECIMAL(10,2), VIEW_COL2 VARCHAR(256) CONSTRAINT pk PRIMARY KEY 
(VIEW_COL1, VIEW_COL2)) AS SELECT * FROM " + tableName);
-            assertTableDefinition(viewConn, viewOfTable1, PTableType.VIEW, 
tableName, 0, 5, 3, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(viewConn, viewOfTable1, PTableType.VIEW, 
tableName, 0, 5, 3, true, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
             
             viewConn.createStatement().execute("CREATE VIEW " + viewOfTable2 + 
" ( VIEW_COL3 VARCHAR(256), VIEW_COL4 DECIMAL(10,2) CONSTRAINT pk PRIMARY KEY 
(VIEW_COL3, VIEW_COL4)) AS SELECT * FROM " + tableName);
-            assertTableDefinition(viewConn, viewOfTable2, PTableType.VIEW, 
tableName, 0, 5, 3, "ID", "COL1", "COL2", "VIEW_COL3", "VIEW_COL4");
+            assertTableDefinition(viewConn, viewOfTable2, PTableType.VIEW, 
tableName, 0, 5, 3, true, "ID", "COL1", "COL2", "VIEW_COL3", "VIEW_COL4");
             
             try {
-                // should fail because there are two view with different pk 
columns
+                // should fail because there are two views with different pk 
columns
                 conn.createStatement().execute("ALTER TABLE " + tableName + " 
ADD VIEW_COL1 DECIMAL(10,2) PRIMARY KEY, VIEW_COL2 VARCHAR(256) PRIMARY KEY");
                 fail();
             }
@@ -585,13 +590,13 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
                     + " CONSTRAINT NAME_PK PRIMARY KEY (%s ID, COL1, COL2)"
                     + " ) %s";
             conn.createStatement().execute(generateDDL(ddlFormat));
-            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 0, 
3, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", "COL2");
+            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 0, 
3, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, true, "ID", "COL1", "COL2");
             
             viewConn.createStatement().execute("CREATE VIEW " + viewOfTable1 + 
" ( VIEW_COL1 DECIMAL(10,2), VIEW_COL2 VARCHAR(256) CONSTRAINT pk PRIMARY KEY 
(VIEW_COL1, VIEW_COL2)) AS SELECT * FROM " + tableName);
-            assertTableDefinition(viewConn, viewOfTable1, PTableType.VIEW, 
tableName, 0, 5, 3, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(viewConn, viewOfTable1, PTableType.VIEW, 
tableName, 0, 5, 3, true, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
             
             viewConn2.createStatement().execute("CREATE VIEW " + viewOfTable2 
+ " ( VIEW_COL1 DECIMAL(10,2), VIEW_COL2 VARCHAR(256) CONSTRAINT pk PRIMARY KEY 
(VIEW_COL1, VIEW_COL2)) AS SELECT * FROM " + tableName);
-            assertTableDefinition(viewConn2, viewOfTable2, PTableType.VIEW, 
tableName, 0, 5, 3,  "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(viewConn2, viewOfTable2, PTableType.VIEW, 
tableName, 0, 5, 3,  true, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
 
             // upsert single row into both view
             String dml = "UPSERT INTO " + viewOfTable1 + " VALUES(?,?,?,?,?)";
@@ -641,10 +646,10 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
             }
             
             conn.createStatement().execute("ALTER TABLE " + tableName + " ADD 
VIEW_COL1 DECIMAL(10,2) PRIMARY KEY, VIEW_COL2 VARCHAR(256) PRIMARY KEY");
-            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 1, 
5, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", "COL2", 
"VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(conn, tableName, PTableType.TABLE, null, 1, 
5, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, false, "ID", "COL1", "COL2", 
"VIEW_COL1", "VIEW_COL2");
             // even though we added columns to the base table, the sequence 
number and base column count is not updated in the view metadata (in 
SYSTEM.CATALOG)
-            assertTableDefinition(viewConn, viewOfTable1, PTableType.VIEW, 
tableName, 0, 5, 3, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
-            assertTableDefinition(viewConn, viewOfTable2, PTableType.VIEW, 
tableName, 0, 5, 3, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(viewConn, viewOfTable1, PTableType.VIEW, 
tableName, 0, 5, 3, true, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(viewConn, viewOfTable2, PTableType.VIEW, 
tableName, 0, 5, 3, true, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
             
             // query table
             ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
@@ -674,17 +679,20 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
             assertEquals("view5", rs.getString("VIEW_COL2"));
             assertFalse(rs.next());
             
-            // the base column count is updated in the ptable
-            PName tenantId = isMultiTenant ? PNameFactory.newName(TENANT1) : 
null;
-            PTable view1 = 
viewConn.unwrap(PhoenixConnection.class).getTable(new PTableKey(tenantId, 
viewOfTable1));
-            PTable view2 = 
viewConn2.unwrap(PhoenixConnection.class).getTable(new PTableKey(tenantId, 
viewOfTable2));
-            assertEquals(isMultiTenant ? 4 : 3, view1.getBaseColumnCount());
-            assertEquals(isMultiTenant ? 4 : 3, view2.getBaseColumnCount());
+            // the column count is updated in the base table
+            PTable table = conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null, tableName));
+            assertColumnsMatch(table.getColumns(), "ID", "COL1", "COL2", 
"VIEW_COL1", "VIEW_COL2");
         }
     }
     
-    public void assertTableDefinition(Connection conn, String fullTableName, 
PTableType tableType, String parentTableName, int sequenceNumber, int 
columnCount, int baseColumnCount, String... columnNames) throws Exception {
-        int delta = isMultiTenant ? 1 : 0;
+    public void assertTableDefinition(Connection conn, String fullTableName, 
PTableType tableType, String parentTableName, int sequenceNumber, int 
columnCount, int baseColumnCount, boolean offsetCountsForSaltedTables, 
String... columnNames) throws Exception {
+        int delta= 0;
+        delta += isMultiTenant ? 1 : 0;
+        // when we create a salted table we include the salt num in the column 
count, but after we
+        // add or drop a column we don't include the salted table in the 
column count, so if a table
+        // is salted take this into account for the column count but no the 
base column count
+        if (offsetCountsForSaltedTables)
+            delta += salted ? 1 : 0;
         String[] cols;
         if (isMultiTenant && tableType!=PTableType.VIEW) {
             cols = (String[])ArrayUtils.addAll(new String[]{"TENANT_ID"}, 
columnNames);
@@ -696,11 +704,20 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
             baseColumnCount==QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT ? 
baseColumnCount : baseColumnCount +delta, cols);
     }
     
-    public void assertColumnsMatch(List<PColumn> actual, String... expected) {
+    private void assertBaseColumnCount(int expected, int actual) {
+        if (salted) ++expected;
+        if (isMultiTenant) ++expected;
+        assertEquals("Base column count does not match", expected, actual);
+    }
+    
+    private void assertColumnsMatch(List<PColumn> actual, String... expected) {
         List<String> expectedCols = Lists.newArrayList(expected);
         if (isMultiTenant) {
             expectedCols.add(0, "TENANT_ID");
         }
+        if (salted) {
+            expectedCols.add(0, "_SALT");
+        }
         assertEquals(expectedCols, Lists.transform(actual, function));
     }
     
@@ -810,59 +827,82 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
             PTable table = PhoenixRuntime.getTableNoCache(viewConn, view1);
             assertEquals(QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT, 
table.getBaseColumnCount());
             
+            try {
+                viewConn.createStatement().execute("SELECT V2 FROM " + view1);
+                fail("V2 should have been droppped");
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.COLUMN_NOT_FOUND.getErrorCode(), 
e.getErrorCode());
+            }
+            
             // Add a new regular column and pk column  to the base table
             String alterBaseTable = "ALTER TABLE " + baseTable + " ADD V3 
VARCHAR, PK2 VARCHAR PRIMARY KEY";
             conn.createStatement().execute(alterBaseTable);
             
             // Column V3 shouldn't have propagated to the diverged view.
-            String sql = "SELECT V3 FROM " + view1;
             try {
-                viewConn.createStatement().execute(sql);
+                viewConn.createStatement().execute("SELECT V3 FROM " + view1);
+                fail();
             } catch (SQLException e) {
                 assertEquals(SQLExceptionCode.COLUMN_NOT_FOUND.getErrorCode(), 
e.getErrorCode());
             }
             
             // However, column V3 should have propagated to the non-diverged 
view.
-            sql = "SELECT V3 FROM " + view2;
-            viewConn2.createStatement().execute(sql);
+            viewConn2.createStatement().execute("SELECT V3 FROM " + view2);
             
             // PK2 should be in both views
-            sql = "SELECT PK2 FROM " + view1;
-            viewConn.createStatement().execute(sql);
-            sql = "SELECT PK2 FROM " + view2;
-            viewConn2.createStatement().execute(sql);
+            viewConn.createStatement().execute("SELECT PK2 FROM " + view1);
+            viewConn2.createStatement().execute("SELECT PK2 FROM " + view2);
             
             // Drop a column from the base table
             alterBaseTable = "ALTER TABLE " + baseTable + " DROP COLUMN V1";
             conn.createStatement().execute(alterBaseTable);
+
+            // V1 should be dropped from the base table
+            try {
+                conn.createStatement().execute("SELECT V1 FROM " + baseTable);
+                fail();
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.COLUMN_NOT_FOUND.getErrorCode(), 
e.getErrorCode());
+            }
             
             // V1 should be dropped from both diverged and non-diverged views
-            sql = "SELECT V1 FROM " + view1;
             try {
-                viewConn.createStatement().execute(sql);
+                viewConn2.createStatement().execute("SELECT V1 FROM " + view2);
+                fail();
             } catch (SQLException e) {
                 assertEquals(SQLExceptionCode.COLUMN_NOT_FOUND.getErrorCode(), 
e.getErrorCode());
             }
-            sql = "SELECT V1 FROM " + view2;
             try {
-                viewConn2.createStatement().execute(sql);
+                viewConn.createStatement().execute("SELECT V1 FROM " + view1);
+//              TODO since the view is diverged we can't filter out the parent 
table column metadata
+//              while building the view. After the client stops sending parent 
table column metadata (see PHOENIX-4766)
+//              while creating a view dropping a parent table column will also 
be reflected in a diverged view
+//              fail();
             } catch (SQLException e) {
                 assertEquals(SQLExceptionCode.COLUMN_NOT_FOUND.getErrorCode(), 
e.getErrorCode());
             }
             
             // V0 should be still exist in both diverged and non-diverged views
-            sql = "SELECT V0 FROM " + view1;
-            viewConn.createStatement().execute(sql);
-            sql = "SELECT V0 FROM " + view2;
-            viewConn2.createStatement().execute(sql);
+            viewConn.createStatement().execute("SELECT V0 FROM " + view1);
+            viewConn2.createStatement().execute("SELECT V0 FROM " + view2);
 
-                       // add the column that was dropped back to the view
-                       String addColumn = "ALTER VIEW " + view1 + " ADD V2 
VARCHAR";
-                       viewConn.createStatement().execute(addColumn);
+            // we currently cannot add a column that was dropped back to the 
view because the excluded column
+            // doesn't contain data type information see PHOENIX-4868
+            try {
+                       viewConn.createStatement().execute("ALTER VIEW " + 
view1 + " ADD V2 VARCHAR");
+                fail();
+            } catch (SQLException e) {
+                
assertEquals(SQLExceptionCode.CANNOT_MUTATE_TABLE.getErrorCode(), 
e.getErrorCode());
+            }
+                       
                        // V2 should not exist in the view
-                       sql = "SELECT V0 FROM " + view1;
-                       viewConn.createStatement().execute(sql);
-        } 
+            try {
+                       viewConn.createStatement().execute("SELECT V2 FROM " + 
view1);
+                fail();
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.COLUMN_NOT_FOUND.getErrorCode(), 
e.getErrorCode());
+            } 
+        }
     }
     
     @Test
@@ -879,10 +919,10 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
                             + " CONSTRAINT NAME_PK PRIMARY KEY (%s ID, COL1, 
COL2)"
                             + " ) %s";
             conn.createStatement().execute(generateDDL(ddlFormat));
-            assertTableDefinition(conn, baseTableName, PTableType.TABLE, null, 
0, 3, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, "ID", "COL1", "COL2");
+            assertTableDefinition(conn, baseTableName, PTableType.TABLE, null, 
0, 3, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, true, "ID", "COL1", "COL2");
             
             viewConn.createStatement().execute("CREATE VIEW " + viewOfTable + 
" ( VIEW_COL1 DECIMAL(10,2), VIEW_COL2 VARCHAR ) AS SELECT * FROM 
"+baseTableName);
-            assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
baseTableName, 0, 5, 3, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
+            assertTableDefinition(viewConn, viewOfTable, PTableType.VIEW, 
baseTableName, 0, 5, 3, true, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
             
             PName tenantId = isMultiTenant ? PNameFactory.newName(TENANT1) : 
null;
             PhoenixConnection phoenixConn = 
conn.unwrap(PhoenixConnection.class);
@@ -1037,6 +1077,10 @@ public class AlterTableWithViewsIT extends 
SplitSystemCatalogIT {
                     .getString());
             assertEquals("Unexpected index ",  fullNameViewIndex2 , 
view.getIndexes().get(1).getName()
                 .getString());
+            assertEquals("Unexpected salt buckets", view.getBucketNum(),
+                view.getIndexes().get(0).getBucketNum());
+            assertEquals("Unexpected salt buckets", view.getBucketNum(),
+                view.getIndexes().get(1).getBucketNum());
             
             // drop two columns
             conn.createStatement().execute("ALTER TABLE " + tableWithView + " 
DROP COLUMN v2, v3 ");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6c1aa45/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
index 796c3d9..bfcc729 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
@@ -724,7 +724,7 @@ public class ViewIT extends SplitSystemCatalogIT {
                 conn.getMetaData().getPrimaryKeys(null,
                     SchemaUtil.getSchemaNameFromFullName(fullViewName),
                     SchemaUtil.getTableNameFromFullName(fullViewName));
-        assertPKs(rs, new String[] {"K3"});
+        assertPKs(rs, new String[] {"K1", "K2", "K3"});
         
         // sanity check upserts into base table and view
         conn.createStatement().executeUpdate("upsert into " + fullTableName + 
" (k1, k2, v1) values (1, 1, 1)");
@@ -753,12 +753,14 @@ public class ViewIT extends SplitSystemCatalogIT {
         ddl = "CREATE VIEW " + fullViewName + "(v2 VARCHAR, k3 VARCHAR, k4 
INTEGER NOT NULL, CONSTRAINT PKVEW PRIMARY KEY (k3, k4)) AS SELECT * FROM " + 
fullTableName + " WHERE K1 = 1";
         conn.createStatement().execute(ddl);
         
+        PTable view = PhoenixRuntime.getTableNoCache(conn, fullViewName);
+        
         // assert PK metadata
         ResultSet rs =
                 conn.getMetaData().getPrimaryKeys(null,
                     SchemaUtil.getSchemaNameFromFullName(fullViewName),
                     SchemaUtil.getTableNameFromFullName(fullViewName));
-        assertPKs(rs, new String[] {"K3", "K4"});
+        assertPKs(rs, new String[] {"K1", "K2", "K3", "K4"});
     }
     
     @Test
@@ -778,7 +780,7 @@ public class ViewIT extends SplitSystemCatalogIT {
 
         // assert PK metadata
         ResultSet rs = conn.getMetaData().getPrimaryKeys(null, SCHEMA2, 
viewName);
-        assertPKs(rs, new String[] {"K3", "K4"});
+        assertPKs(rs, new String[] {"K1", "K2", "K3", "K4"});
     }
     
     @Test
@@ -1005,8 +1007,8 @@ public class ViewIT extends SplitSystemCatalogIT {
                         + tableName + " WHERE KEY_PREFIX = 'ab4' ");
 
                 // upsert rows
-                upsertRows(viewName1, tenantConn);
-                upsertRows(viewName2, tenantConn);
+                upsertRows(tableName, viewName1, tenantConn);
+                upsertRows(tableName, viewName2, tenantConn);
 
                 // run queries
                 String[] whereClauses =
@@ -1123,7 +1125,7 @@ public class ViewIT extends SplitSystemCatalogIT {
         }
     }
 
-    private void upsertRows(String viewName1, Connection tenantConn) throws 
SQLException {
+    private void upsertRows(String tableName, String viewName1, Connection 
tenantConn) throws SQLException, IOException {
         tenantConn.createStatement().execute("UPSERT INTO " + viewName1
                 + " (pk1, pk2, col1, col3) VALUES ('testa', 'testb', 
TO_DATE('2017-10-16 22:00:00', 'yyyy-MM-dd HH:mm:ss'), 10)");
         tenantConn.createStatement().execute("UPSERT INTO " + viewName1

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6c1aa45/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index cf5828d..83c7f4d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -249,7 +249,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.cache.Cache;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
@@ -417,6 +416,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     private static final KeyValue COLUMN_DEF_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
COLUMN_DEF_BYTES);
     private static final KeyValue IS_ROW_TIMESTAMP_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
IS_ROW_TIMESTAMP_BYTES);
     private static final KeyValue COLUMN_QUALIFIER_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
COLUMN_QUALIFIER_BYTES);
+    // this key value is used to represent a column derived from a parent that 
was deleted (by
+    // storing a value of LinkType.EXCLUDED_COLUMN)
+    private static final KeyValue LINK_TYPE_KV =
+            createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
LINK_TYPE_BYTES);
 
     private static final List<KeyValue> COLUMN_KV_COLUMNS = 
Arrays.<KeyValue>asList(
             DECIMAL_DIGITS_KV,
@@ -431,7 +434,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
             IS_VIEW_REFERENCED_KV,
             COLUMN_DEF_KV,
             IS_ROW_TIMESTAMP_KV,
-            COLUMN_QUALIFIER_KV
+            COLUMN_QUALIFIER_KV,
+            LINK_TYPE_KV
             );
     static {
         Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -449,7 +453,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     private static final int COLUMN_DEF_INDEX = 
COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV);
     private static final int IS_ROW_TIMESTAMP_INDEX = 
COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV);
     private static final int COLUMN_QUALIFIER_INDEX = 
COLUMN_KV_COLUMNS.indexOf(COLUMN_QUALIFIER_KV);
+    // the index of the key value is used to represent a column derived from a 
parent that was
+    // deleted (by storing a value of LinkType.EXCLUDED_COLUMN)
+    private static final int EXCLUDED_COLUMN_LINK_TYPE_KV_INDEX =
+            COLUMN_KV_COLUMNS.indexOf(LINK_TYPE_KV);
 
+    // index for link type key value that is used to store linking rows
     private static final int LINK_TYPE_INDEX = 0;
 
     private static final KeyValue CLASS_NAME_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
CLASS_NAME_BYTES);
@@ -496,6 +505,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     private static final int DEFAULT_VALUE_INDEX = 
FUNCTION_ARG_KV_COLUMNS.indexOf(DEFAULT_VALUE_KV);
     private static final int MIN_VALUE_INDEX = 
FUNCTION_ARG_KV_COLUMNS.indexOf(MIN_VALUE_KV);
     private static final int MAX_VALUE_INDEX = 
FUNCTION_ARG_KV_COLUMNS.indexOf(MAX_VALUE_KV);
+    
+    private static PName newPName(byte[] buffer) {
+        return buffer==null ? null : newPName(buffer, 0, buffer.length);
+    }
 
     private static PName newPName(byte[] keyBuffer, int keyOffset, int 
keyLength) {
         if (keyLength <= 0) {
@@ -725,9 +738,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
         List<PColumn> excludedColumns = Lists.newArrayList();
         // add my own columns first in reverse order
         List<PColumn> myColumns = table.getColumns();
-        // skip salted column as it will be added from the base table columns
-        int startIndex = table.getBucketNum() != null ? 1 : 0;
-        for (int i = myColumns.size() - 1; i >= startIndex; i--) {
+        // skip salted column as it will be created automatically
+        myColumns = myColumns.subList(isSalted ? 1 : 0, myColumns.size());
+        for (int i = myColumns.size() - 1; i >= 0; i--) {
             PColumn pColumn = myColumns.get(i);
             if (pColumn.isExcluded()) {
                 excludedColumns.add(pColumn);
@@ -804,17 +817,17 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 if (hasIndexId) {
                     // add all pk columns of parent tables to indexes
                     // skip salted column as it will be added from the base 
table columns
-                    startIndex = pTable.getBucketNum() != null ? 1 : 0;
+                    int startIndex = pTable.getBucketNum() != null ? 1 : 0;
                     for (int index=startIndex; 
index<pTable.getPKColumns().size(); index++) {
-                        PColumn column = pTable.getPKColumns().get(index);
+                        PColumn pkColumn = pTable.getPKColumns().get(index);
                         // don't add the salt column of ancestor tables for 
view indexes
-                        if (column.equals(SaltingUtil.SALTING_COLUMN) || 
column.isExcluded()) {
+                        if (pkColumn.equals(SaltingUtil.SALTING_COLUMN) || 
pkColumn.isExcluded()) {
                             continue;
                         }
-                        column = IndexUtil.getIndexPKColumn(++numPKCols, 
column);
-                        int existingColumnIndex = allColumns.indexOf(column);
+                        pkColumn = IndexUtil.getIndexPKColumn(++numPKCols, 
pkColumn);
+                        int existingColumnIndex = allColumns.indexOf(pkColumn);
                         if (existingColumnIndex == -1) {
-                            allColumns.add(0, column);
+                            allColumns.add(0, pkColumn);
                         }
                     }
                     for (int j = 0; j < pTable.getColumns().size(); j++) {
@@ -832,6 +845,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
                 } else {
                     List<PColumn> currAncestorTableCols = 
PTableImpl.getColumnsToClone(pTable);
                     if (currAncestorTableCols != null) {
+                        // add the ancestor columns in reverse order so that 
the final column list
+                        // contains ancestor columns and then the view columns 
in the right order
                         for (int j = currAncestorTableCols.size() - 1; j >= 0; 
j--) {
                             PColumn column = currAncestorTableCols.get(j);
                             // for diverged views we always include pk columns 
of the base table. We
@@ -861,10 +876,15 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                             } else {
                                 int existingColumnIndex = 
allColumns.indexOf(column);
                                 if (existingColumnIndex != -1) {
-                                    // if the same column exists in a parent 
and child, we keep the
-                                    // latest column
+                                    // for diverged views if the view was 
created before
+                                    // PHOENIX-3534 the parent table columns 
will be present in the
+                                    // view PTable (since the base column 
count is
+                                    // 
QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT we can't
+                                    // filter them out) so we always pick the 
parent column  
+                                    // for non diverged views if the same 
column exists in a parent
+                                    // and child, we keep the latest column
                                     PColumn existingColumn = 
allColumns.get(existingColumnIndex);
-                                    if (column.getTimestamp() > 
existingColumn.getTimestamp()) {
+                                    if (isDiverged || column.getTimestamp() > 
existingColumn.getTimestamp()) {
                                         allColumns.remove(existingColumnIndex);
                                         allColumns.add(column);
                                     }
@@ -892,8 +912,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
                 }
             }
         }
-        // lets remove the excluded columns first if the timestamp is newer 
than
-        // the added column
+        // remove the excluded columns if the timestamp is newer than the 
added column
         for (PColumn excludedColumn : excludedColumns) {
             int index = allColumns.indexOf(excludedColumn);
             if (index != -1) {
@@ -904,27 +923,26 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         }
         List<PColumn> columnsToAdd = Lists.newArrayList();
         int position = isSalted ? 1 : 0;
+        // allColumns contains the columns in the reverse order
         for (int i = allColumns.size() - 1; i >= 0; i--) {
             PColumn column = allColumns.get(i);
             if (table.getColumns().contains(column)) {
                 // for views this column is not derived from an ancestor
-                columnsToAdd.add(new PColumnImpl(column, position));
+                columnsToAdd.add(new PColumnImpl(column, position++));
             } else {
-                columnsToAdd.add(new PColumnImpl(column, true, position));
+                columnsToAdd.add(new PColumnImpl(column, true, position++));
             }
-            position++;
         }
-        // need to have the columns in the PTable to use the WhereCompiler
-        // unfortunately so this needs to be done
-        // twice....
-        // TODO set the view properties correctly instead of just setting them
-        // same as the base table
+        // we need to include the salt column when setting the base table 
column count in order to
+        // maintain b/w compatibility
         int baseTableColumnCount =
                 isDiverged ? QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT
-                        : columnsToAdd.size() - myColumns.size();
+                        : columnsToAdd.size() - myColumns.size() + (isSalted ? 
1 : 0);
+        // TODO Implement PHOENIX-4763 to set the view properties correctly 
instead of just
+        // setting them same as the base table
         PTableImpl pTable =
                 PTableImpl.makePTable(table, baseTable, columnsToAdd, 
maxTableTimestamp,
-                    baseTableColumnCount);
+                    baseTableColumnCount, excludedColumns);
         return WhereConstantParser.addViewInfoToPColumnsIfNeeded(pTable);
     }
 
@@ -1057,7 +1075,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
     }
 
     private void addColumnToTable(List<Cell> results, PName colName, PName 
famName,
-        Cell[] colKeyValues, List<PColumn> columns, boolean isSalted) {
+            Cell[] colKeyValues, List<PColumn> columns, boolean isSalted, int 
baseColumnCount,
+            boolean isRegularView) {
         int i = 0;
         int j = 0;
         while (i < results.size() && j < COLUMN_KV_COLUMNS.size()) {
@@ -1082,7 +1101,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             throw new IllegalStateException("Didn't find all required key 
values in '"
                     + colName.getString() + "' column metadata row");
         }
-
+        
         Cell columnSizeKv = colKeyValues[COLUMN_SIZE_INDEX];
         Integer maxLength =
                 columnSizeKv == null ? null : 
PInteger.INSTANCE.getCodec().decodeInt(
@@ -1094,7 +1113,37 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         Cell ordinalPositionKv = colKeyValues[ORDINAL_POSITION_INDEX];
         int position =
             
PInteger.INSTANCE.getCodec().decodeInt(ordinalPositionKv.getValueArray(),
-                    ordinalPositionKv.getValueOffset(), 
SortOrder.getDefault()) + (isSalted ? 1 : 0);
+                    ordinalPositionKv.getValueOffset(), 
SortOrder.getDefault()) + (isSalted ? 1 : 0);;
+
+        // Prior to PHOENIX-4766 we were sending the parent table column 
metadata while creating a
+        // child view, now that we combine columns by resolving the parent 
table hierarchy we
+        // don't need to include the parent table column while loading the 
PTable of the view
+        if (isRegularView && position <= baseColumnCount) {
+            return;
+        }
+        
+        // if this column was inherited from a parent and was dropped that we 
create an excluded
+        // column, this check is only needed to handle view metadata that was 
created before
+        // PHOENIX-4766 where we were sending the parent table column metadata 
when creating a
+        // childview
+        Cell excludedColumnKv = 
colKeyValues[EXCLUDED_COLUMN_LINK_TYPE_KV_INDEX];
+        if (excludedColumnKv != null && colKeyValues[DATA_TYPE_INDEX]
+                .getTimestamp() <= excludedColumnKv.getTimestamp()) {
+            LinkType linkType =
+                    LinkType.fromSerializedValue(
+                        
excludedColumnKv.getValueArray()[excludedColumnKv.getValueOffset()]);
+            if (linkType == LinkType.EXCLUDED_COLUMN) {
+                addExcludedColumnToTable(columns, colName, famName, 
excludedColumnKv.getTimestamp());
+            } else {
+                // if we have a column metadata row that has a link type 
keyvalue it should
+                // represent an excluded column by containing the 
LinkType.EXCLUDED_COLUMN
+                throw new IllegalStateException(
+                        "Link type should be EXCLUDED_COLUMN but found an 
unxpected link type for key value "
+                                + excludedColumnKv);
+            }
+            return;
+        }
+        
         Cell nullableKv = colKeyValues[NULLABLE_INDEX];
         boolean isNullable =
             PInteger.INSTANCE.getCodec().decodeInt(nullableKv.getValueArray(),
@@ -1136,8 +1185,11 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 Arrays.copyOfRange(columnQualifierKV.getValueArray(),
                     columnQualifierKV.getValueOffset(), 
columnQualifierKV.getValueOffset()
                             + columnQualifierKV.getValueLength()) : 
(isPkColumn ? null : colName.getBytes());
-        PColumn column = new PColumnImpl(colName, famName, dataType, 
maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, 
isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifierBytes,
-            results.get(0).getTimestamp());
+        PColumn column =
+                new PColumnImpl(colName, famName, dataType, maxLength, scale, 
isNullable,
+                        position - 1, sortOrder, arraySize, viewConstant, 
isViewReferenced,
+                        expressionStr, isRowTimestamp, false, 
columnQualifierBytes,
+                        results.get(0).getTimestamp());
         columns.add(column);
     }
 
@@ -1427,7 +1479,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                   addExcludedColumnToTable(columns, colName, famName, 
colKv.getTimestamp());
               }
           } else {
-              addColumnToTable(results, colName, famName, colKeyValues, 
columns, saltBucketNum != null);
+              boolean isRegularView = (tableType == PTableType.VIEW && 
viewType!=ViewType.MAPPED);
+              addColumnToTable(results, colName, famName, colKeyValues, 
columns, saltBucketNum != null, baseColumnCount, isRegularView);
           }
         }
         // Avoid querying the stats table because we're holding the rowLock 
here. Issuing an RPC to a remote
@@ -1911,42 +1964,6 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 dropChildMetadata(schemaName, tableName, tenantIdBytes);
             }
             
-            // Here we are passed the parent's columns to add to a view, 
PHOENIX-3534 allows for a splittable
-            // System.Catalog thus we only store the columns that are new to 
the view, not the parents columns,
-            // thus here we remove everything that is ORDINAL.POSITION <= 
baseColumnCount and update the
-            // ORDINAL.POSITIONS to be shifted accordingly.
-            // TODO PHOENIX-4767 remove the following code that removes the 
base table column metadata in the next release 
-            if (PTableType.VIEW.equals(tableType) && 
!ViewType.MAPPED.equals(viewType)) {
-                boolean isSalted = MetaDataUtil.getSaltBuckets(tableMetadata, 
GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable()) > 0;
-                int baseColumnCount = 
MetaDataUtil.getBaseColumnCount(tableMetadata) - (isSalted ? 1 : 0);
-                if (baseColumnCount > 0) {
-                    Iterator<Mutation> mutationIterator = 
tableMetadata.iterator();
-                    while (mutationIterator.hasNext()) {
-                        Mutation mutation = mutationIterator.next();
-                        // if not null and ordinal position < base column 
count remove this mutation
-                        ImmutableBytesWritable ptr = new 
ImmutableBytesWritable();
-                        MetaDataUtil.getMutationValue(mutation, 
PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES,
-                            GenericKeyValueBuilder.INSTANCE, ptr);
-                        if (MetaDataUtil.getMutationValue(mutation, 
PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES,
-                            GenericKeyValueBuilder.INSTANCE, ptr)) {
-                            int ordinalValue = 
PInteger.INSTANCE.getCodec().decodeInt(ptr, SortOrder.ASC);
-                            if (ordinalValue <= baseColumnCount) {
-                                mutationIterator.remove();
-                            } else {
-                                if (mutation instanceof Put) {
-                                    byte[] ordinalPositionBytes = new 
byte[PInteger.INSTANCE.getByteSize()];
-                                    int newOrdinalValue = ordinalValue - 
baseColumnCount;
-                                    PInteger.INSTANCE.getCodec()
-                                        .encodeInt(newOrdinalValue, 
ordinalPositionBytes, 0);
-                                    byte[] family = 
Iterables.getOnlyElement(mutation.getFamilyCellMap().keySet());
-                                    MetaDataUtil.mutatePutValue((Put) 
mutation, family, PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, 
ordinalPositionBytes);
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-
             byte[] parentTableKey = null;
             Mutation viewPhysicalTableRow = null;
             Set<TableName> indexes = new HashSet<TableName>();;
@@ -2931,6 +2948,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
         List<Put> columnPutsForBaseTable =
                 Lists.newArrayListWithExpectedSize(tableMetadata.size());
+        boolean salted = basePhysicalTable.getBucketNum()!=null;
         // Isolate the puts relevant to adding columns 
         for (Mutation m : tableMetadata) {
             if (m instanceof Put) {
@@ -2970,9 +2988,15 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 return new 
MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, 
EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
             }
             
-            //add the new columns to the child view
+            // add the new columns to the child view
             List<PColumn> viewPkCols = new ArrayList<>(view.getPKColumns());
-            boolean addingExistingPkCol = false;
+            // remove salted column
+            if (salted) {
+                viewPkCols.remove(0);
+            }
+            // remove pk columns that are present in the parent
+            viewPkCols.removeAll(basePhysicalTable.getPKColumns());
+            boolean addedPkColumn = false;
             for (Put columnToBeAdded : columnPutsForBaseTable) {
                 PColumn existingViewColumn = null;
                 byte[][] rkmd = new byte[5][];
@@ -2993,7 +3017,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     // ignore since it means the column is not present in the 
view
                 }
 
-                boolean isColumnToBeAddPkCol = columnFamily == null;
+                boolean isCurrColumnToBeAddPkCol = columnFamily == null;
+                addedPkColumn |= isCurrColumnToBeAddPkCol;
                 if (existingViewColumn != null) {
                     if 
(EncodedColumnsUtil.usesEncodedColumnNames(basePhysicalTable)
                             && !SchemaUtil.isPKColumn(existingViewColumn)) {
@@ -3057,7 +3082,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
 
                     // if the column to be added to the base table is a pk 
column, then we need to
                     // validate that the key slot position is the same
-                    if (isColumnToBeAddPkCol) {
+                    if (isCurrColumnToBeAddPkCol) {
                         List<Cell> keySeqCells =
                                 
columnToBeAdded.get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                                     PhoenixDatabaseMetaData.KEY_SEQ_BYTES);
@@ -3066,10 +3091,13 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                             int keySeq =
                                     
PSmallint.INSTANCE.getCodec().decodeInt(cell.getValueArray(),
                                         cell.getValueOffset(), 
SortOrder.getDefault());
+                            // we need to take into account the columns 
inherited from the base table
+                            // if the table is salted we don't include the 
salted column (which is
+                            // present in getPKColumns())
                             int pkPosition =
                                     basePhysicalTable.getPKColumns().size()
-                                            + SchemaUtil.getPKPosition(view, 
existingViewColumn)
-                                            + 1;
+                                            + SchemaUtil.getPKPosition(view, 
existingViewColumn) + 1
+                                            - (salted ? 2 : 0); 
                             if (pkPosition != keySeq) {
                                 return new MetaDataMutationResult(
                                         MutationCode.UNALLOWED_TABLE_MUTATION,
@@ -3079,9 +3107,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                         }
                     }
                 }
-                if (isColumnToBeAddPkCol) {
+                if (existingViewColumn!=null && isCurrColumnToBeAddPkCol) {
                     viewPkCols.remove(existingViewColumn);
-                    addingExistingPkCol = true;
                 }
             }
             /*
@@ -3089,7 +3116,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
              * the same as the base table pk columns 2. if we are adding all 
the existing view pk
              * columns to the base table
              */
-            if (addingExistingPkCol && !viewPkCols.isEmpty()) {
+            if (addedPkColumn && !viewPkCols.isEmpty()) {
                 return new 
MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
                         EnvironmentEdgeManager.currentTimeMillis(), 
basePhysicalTable);
             }
@@ -3268,16 +3295,27 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                                 && Bytes.compareTo(tableName, 
rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
                             try {
                                 addingCol = true;
-                                if (pkCount > FAMILY_NAME_INDEX
-                                        && 
rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
+                                byte[] familyName = null;
+                                byte[] colName = null;
+                                if (pkCount > FAMILY_NAME_INDEX) {
+                                    familyName = 
rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX];
+                                }
+                                if (pkCount > COLUMN_NAME_INDEX) {
+                                    colName = 
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX];
+                                }
+                                if (table.getExcludedColumns().contains(
+                                    
PColumnImpl.createExcludedColumn(newPName(familyName), newPName(colName), 0l))) 
{
+                                    // if this column was previously dropped 
in a view do not allow adding the column back
+                                    return new MetaDataMutationResult(
+                                            
MutationCode.UNALLOWED_TABLE_MUTATION, 
EnvironmentEdgeManager.currentTimeMillis(), null);
+                                }
+                                if (familyName!=null && familyName.length > 0) 
{
                                     PColumnFamily family =
-                                            
table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
-                                    
family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
-                                } else if (pkCount > COLUMN_NAME_INDEX
-                                        && 
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
+                                            table.getColumnFamily(familyName);
+                                            
family.getPColumnForColumnNameBytes(colName);
+                                } else if (colName!=null && colName.length > 
0) {
                                     addingPKColumn = true;
-                                    table.getPKColumn(new String(
-                                            
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
+                                    table.getPKColumn(new String(colName));
                                 } else {
                                     continue;
                                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6c1aa45/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 1b5760d..18c9000 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -26,6 +26,7 @@ import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
@@ -491,6 +492,7 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
         buf.append(buf.length() == 0 ? "" : " and ");
     }
     
+    // While creating the PColumns we don't care about the ordinal positiion 
so set it to 1
     private static final PColumnImpl TENANT_ID_COLUMN = new 
PColumnImpl(PNameFactory.newName(TENANT_ID),
             PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, 
null, false, 1, SortOrder.getDefault(),
             0, null, false, null, false, false, DATA_TYPE_BYTES, 
HConstants.LATEST_TIMESTAMP);
@@ -578,6 +580,14 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     private static final PColumnImpl KEY_SEQ_COLUMN = new 
PColumnImpl(PNameFactory.newName(KEY_SEQ),
             PNameFactory.newName(TABLE_FAMILY_BYTES), PSmallint.INSTANCE, 
null, null, false, 1, SortOrder.getDefault(),
             0, null, false, null, false, false, KEY_SEQ_BYTES, 
HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl PK_NAME_COLUMN = new 
PColumnImpl(PNameFactory.newName(PK_NAME),
+        PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, 
null, false, 1, SortOrder.getDefault(),
+        0, null, false, null, false, false, PK_NAME_BYTES, 
HConstants.LATEST_TIMESTAMP);
+    public static final String ASC_OR_DESC = "ASC_OR_DESC";
+    public static final byte[] ASC_OR_DESC_BYTES = Bytes.toBytes(ASC_OR_DESC);
+    private static final PColumnImpl ASC_OR_DESC_COLUMN = new 
PColumnImpl(PNameFactory.newName(ASC_OR_DESC),
+        PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, 
null, false, 1, SortOrder.getDefault(),
+        0, null, false, null, false, false, ASC_OR_DESC_BYTES, 
HConstants.LATEST_TIMESTAMP);
     
     private static final List<PColumnImpl> PK_DATUM_LIST = 
Lists.newArrayList(TENANT_ID_COLUMN, TABLE_SCHEM_COLUMN, TABLE_NAME_COLUMN, 
COLUMN_NAME_COLUMN);
     
@@ -647,6 +657,43 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
                             new KeyValueColumnExpression(KEY_SEQ_COLUMN), 
false)
                     ), 0, true);
     
+    private static final RowProjector GET_PRIMARY_KEYS_ROW_PROJECTOR =
+            new RowProjector(
+                    Arrays.<ColumnProjector> asList(
+                        new ExpressionProjector(TABLE_CAT, SYSTEM_CATALOG,
+                                new RowKeyColumnExpression(TENANT_ID_COLUMN,
+                                        new RowKeyValueAccessor(PK_DATUM_LIST, 
0)),
+                                false),
+                        new ExpressionProjector(TABLE_SCHEM, SYSTEM_CATALOG,
+                                new RowKeyColumnExpression(TABLE_SCHEM_COLUMN,
+                                        new RowKeyValueAccessor(PK_DATUM_LIST, 
1)),
+                                false),
+                        new ExpressionProjector(TABLE_NAME, SYSTEM_CATALOG,
+                                new RowKeyColumnExpression(TABLE_NAME_COLUMN,
+                                        new RowKeyValueAccessor(PK_DATUM_LIST, 
2)),
+                                false),
+                        new ExpressionProjector(COLUMN_NAME, SYSTEM_CATALOG,
+                                new RowKeyColumnExpression(COLUMN_NAME_COLUMN,
+                                        new RowKeyValueAccessor(PK_DATUM_LIST, 
3)),
+                                false),
+                        new ExpressionProjector(KEY_SEQ, SYSTEM_CATALOG,
+                                new KeyValueColumnExpression(KEY_SEQ_COLUMN), 
false),
+                        new ExpressionProjector(PK_NAME, SYSTEM_CATALOG,
+                                new KeyValueColumnExpression(PK_NAME_COLUMN), 
false),
+                        new ExpressionProjector(ASC_OR_DESC, SYSTEM_CATALOG,
+                                new 
KeyValueColumnExpression(ASC_OR_DESC_COLUMN), false),
+                        new ExpressionProjector(DATA_TYPE, SYSTEM_CATALOG,
+                                new 
KeyValueColumnExpression(DATA_TYPE_COLUMN), false),
+                        new ExpressionProjector(TYPE_NAME, SYSTEM_CATALOG,
+                                new 
KeyValueColumnExpression(TYPE_NAME_COLUMN), false),
+                        new ExpressionProjector(COLUMN_SIZE, SYSTEM_CATALOG,
+                                new 
KeyValueColumnExpression(COLUMN_SIZE_COLUMN), false),
+                        new ExpressionProjector(TYPE_ID, SYSTEM_CATALOG,
+                                new KeyValueColumnExpression(TYPE_ID_COLUMN), 
false),
+                        new ExpressionProjector(VIEW_CONSTANT, SYSTEM_CATALOG,
+                                new 
KeyValueColumnExpression(VIEW_CONSTANT_COLUMN), false)),
+                    0, true);
+    
     private boolean match(String str, String pattern) throws SQLException {
         LiteralExpression strExpr = LiteralExpression.newConstant(str, 
PVarchar.INSTANCE, SortOrder.ASC);
         LiteralExpression patternExpr = LiteralExpression.newConstant(pattern, 
PVarchar.INSTANCE, SortOrder.ASC);
@@ -686,10 +733,12 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
             String tableName = rs.getString(TABLE_NAME);
             String tenantId = rs.getString(TABLE_CAT);
             String fullTableName = SchemaUtil.getTableName(schemaName, 
tableName);
-            PTable table = PhoenixRuntime.getTable(connection, fullTableName);
+            PTable table = PhoenixRuntime.getTableNoCache(connection, 
fullTableName);
             boolean isSalted = table.getBucketNum()!=null;
             boolean tenantColSkipped = false;
-            for (PColumn column : table.getColumns()) {
+            List<PColumn> columns = table.getColumns();
+            columns = Lists.newArrayList(columns.subList(isSalted ? 1 : 0, 
columns.size()));
+            for (PColumn column : columns) {
                 if (isTenantSpecificConnection && 
column.equals(table.getPKColumns().get(0))) {
                     // skip the tenant column
                     tenantColSkipped = true;
@@ -1080,33 +1129,88 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     }
 
     @Override
-    public ResultSet getPrimaryKeys(String catalog, String schema, String 
table) throws SQLException {
-        if (table == null || table.length() == 0) {
+    public ResultSet getPrimaryKeys(String catalog, String schemaName, String 
tableName)
+            throws SQLException {
+        if (tableName == null || tableName.length() == 0) {
             return emptyResultSet;
         }
-        StringBuilder buf = new StringBuilder("select \n" +
-                TENANT_ID + " " + TABLE_CAT + "," + // use catalog for 
tenant_id
-                TABLE_SCHEM + "," +
-                TABLE_NAME + " ," +
-                COLUMN_NAME + "," +
-                KEY_SEQ + "," +
-                PK_NAME + "," +
-                "CASE WHEN " + SORT_ORDER + " = " + 
(SortOrder.DESC.getSystemValue()) + " THEN 'D' ELSE 'A' END ASC_OR_DESC," +
-                ExternalSqlTypeIdFunction.NAME + "(" + DATA_TYPE + ") AS " + 
DATA_TYPE + "," +
-                SqlTypeNameFunction.NAME + "(" + DATA_TYPE + ") AS " + 
TYPE_NAME + "," +
-                COLUMN_SIZE + "," +
-                DATA_TYPE + " " + TYPE_ID + "," + // raw type id
-                VIEW_CONSTANT +
-                " from " + SYSTEM_CATALOG + " " + SYSTEM_CATALOG_ALIAS +
-                " where ");
-        buf.append(TABLE_SCHEM + (schema == null || schema.length() == 0 ? " 
is null" : " = '" + StringUtil.escapeStringConstant(schema) + "'" ));
-        buf.append(" and " + TABLE_NAME + " = '" + 
StringUtil.escapeStringConstant(table) + "'" );
-        buf.append(" and " + COLUMN_NAME + " is not null");
-        buf.append(" and " + COLUMN_FAMILY + " is null");
-        addTenantIdFilter(buf, catalog);
-        buf.append(" order by " + TENANT_ID + "," + TABLE_SCHEM + "," + 
TABLE_NAME + " ," + COLUMN_NAME);
-        ResultSet rs = 
connection.createStatement().executeQuery(buf.toString());
-        return rs;
+        List<Tuple> tuples = Lists.newArrayListWithExpectedSize(10);
+        ResultSet rs = getTables(catalog, schemaName, tableName, null);
+        while (rs.next()) {
+            String tenantId = rs.getString(TABLE_CAT);
+            String fullTableName = SchemaUtil.getTableName(schemaName, 
tableName);
+            PTable table = PhoenixRuntime.getTableNoCache(connection, 
fullTableName);
+            boolean isSalted = table.getBucketNum() != null;
+            boolean tenantColSkipped = false;
+            List<PColumn> pkColumns = table.getPKColumns();
+            List<PColumn> sorderPkColumns =
+                    Lists.newArrayList(pkColumns.subList(isSalted ? 1 : 0, 
pkColumns.size()));
+            // sort the columns by name
+            Collections.sort(sorderPkColumns, new Comparator<PColumn>(){
+                @Override public int compare(PColumn c1, PColumn c2) {
+                    return 
c1.getName().getString().compareTo(c2.getName().getString());
+                }
+            });
+            
+            for (PColumn column : sorderPkColumns) {
+                String columnName = column.getName().getString();
+                // generate row key
+                // TENANT_ID, TABLE_SCHEM, TABLE_NAME , COLUMN_NAME are row 
key columns
+                byte[] rowKey =
+                        SchemaUtil.getColumnKey(tenantId, schemaName, 
tableName, columnName, null);
+
+                // add one cell for each column info
+                List<Cell> cells = Lists.newArrayListWithCapacity(8);
+                // KEY_SEQ_COLUMN
+                byte[] keySeqBytes = ByteUtil.EMPTY_BYTE_ARRAY;
+                int pkPos = pkColumns.indexOf(column);
+                if (pkPos != -1) {
+                    short keySeq =
+                            (short) (pkPos + 1 - (isSalted ? 1 : 0) - 
(tenantColSkipped ? 1 : 0));
+                    keySeqBytes = PSmallint.INSTANCE.toBytes(keySeq);
+                }
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, 
KEY_SEQ_BYTES,
+                    MetaDataProtocol.MIN_TABLE_TIMESTAMP, keySeqBytes));
+                // PK_NAME
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, 
PK_NAME_BYTES,
+                    MetaDataProtocol.MIN_TABLE_TIMESTAMP, table.getPKName() != 
null
+                            ? table.getPKName().getBytes() : 
ByteUtil.EMPTY_BYTE_ARRAY));
+                // ASC_OR_DESC
+                char sortOrder = column.getSortOrder() == SortOrder.ASC ? 'A' 
: 'D';
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    ASC_OR_DESC_BYTES, MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    Bytes.toBytes(sortOrder)));
+                // DATA_TYPE
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, 
DATA_TYPE_BYTES,
+                    MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    
PInteger.INSTANCE.toBytes(column.getDataType().getResultSetSqlType())));
+                // TYPE_NAME
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(TYPE_NAME), 
MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    column.getDataType().getSqlTypeNameBytes()));
+                // COLUMN_SIZE
+                cells.add(
+                    KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, 
COLUMN_SIZE_BYTES,
+                        MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                        column.getMaxLength() != null
+                                ? 
PInteger.INSTANCE.toBytes(column.getMaxLength())
+                                : ByteUtil.EMPTY_BYTE_ARRAY));
+                // TYPE_ID
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(TYPE_ID), 
MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    
PInteger.INSTANCE.toBytes(column.getDataType().getSqlType())));
+                // VIEW_CONSTANT
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, 
VIEW_CONSTANT_BYTES,
+                    MetaDataProtocol.MIN_TABLE_TIMESTAMP, 
column.getViewConstant() != null
+                            ? column.getViewConstant() : 
ByteUtil.EMPTY_BYTE_ARRAY));
+                Collections.sort(cells, new CellComparator());
+                Tuple tuple = new MultiKeyValueTuple(cells);
+                tuples.add(tuple);
+            }
+        }
+        return new PhoenixResultSet(new MaterializedResultIterator(tuples),
+                GET_PRIMARY_KEYS_ROW_PROJECTOR,
+                new StatementContext(new PhoenixStatement(connection), false));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6c1aa45/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 50bb722..bf2bc59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -81,6 +81,11 @@ public class DelegateTable implements PTable {
     public List<PColumn> getColumns() {
         return delegate.getColumns();
     }
+    
+    @Override
+    public List<PColumn> getExcludedColumns() {
+        return delegate.getExcludedColumns();
+    }
 
     @Override
     public List<PColumnFamily> getColumnFamilies() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6c1aa45/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 649f4c6..d4f4ae3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -198,8 +198,6 @@ import org.apache.phoenix.parse.PSchema;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.PrimaryKeyConstraint;
-import org.apache.phoenix.parse.SQLParser;
-import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.UpdateStatisticsStatement;
 import org.apache.phoenix.parse.UseSchemaStatement;
@@ -2456,7 +2454,7 @@ public class MetaDataClient {
                             .build().buildException();
                     }
                     if (tableType == PTableType.VIEW && viewType != 
ViewType.MAPPED) {
-                        throwIfLastPKOfParentIsFixedLength(parent, schemaName, 
tableName, colDef);
+                        throwIfLastPKOfParentIsVariableLength(parent, 
schemaName, tableName, colDef);
                     }
                     if (!pkColumns.add(column)) {
                         throw new ColumnAlreadyExistsException(schemaName, 
tableName, column.getName().getString());
@@ -3391,7 +3389,7 @@ public class MetaDataClient {
                                 }
                             }
                             if (colDef != null && colDef.isPK() && 
table.getType() == VIEW && table.getViewType() != MAPPED) {
-                                
throwIfLastPKOfParentIsFixedLength(getParentOfView(table), schemaName, 
tableName, colDef);
+                                
throwIfLastPKOfParentIsVariableLength(getParentOfView(table), schemaName, 
tableName, colDef);
                             }
                             if (colDef != null && colDef.isRowTimestamp()) {
                                 throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.ROWTIMESTAMP_CREATE_ONLY)
@@ -3898,11 +3896,21 @@ public class MetaDataClient {
                         Map<String, List<TableRef>> tenantIdTableRefMap = 
Maps.newHashMap();
                         if (result.getSharedTablesToDelete() != null) {
                             for (SharedTableState sharedTableState : 
result.getSharedTablesToDelete()) {
-                                PTableImpl viewIndexTable = new 
PTableImpl(sharedTableState.getTenantId(),
-                                        sharedTableState.getSchemaName(), 
sharedTableState.getTableName(), ts,
-                                        table.getColumnFamilies(), 
sharedTableState.getColumns(),
-                                        sharedTableState.getPhysicalNames(), 
sharedTableState.getViewIndexType(), sharedTableState.getViewIndexId(),
-                                        table.isMultiTenant(), 
table.isNamespaceMapped(), table.getImmutableStorageScheme(), 
table.getEncodingScheme(), table.getEncodedCQCounter(), 
table.useStatsForParallelization());
+                                PTableImpl viewIndexTable =
+                                        new 
PTableImpl(sharedTableState.getTenantId(),
+                                                
sharedTableState.getSchemaName(),
+                                                
sharedTableState.getTableName(), ts,
+                                                table.getColumnFamilies(),
+                                                sharedTableState.getColumns(),
+                                                
sharedTableState.getPhysicalNames(),
+                                                
sharedTableState.getViewIndexType(),
+                                                
sharedTableState.getViewIndexId(),
+                                                table.isMultiTenant(), 
table.isNamespaceMapped(),
+                                                
table.getImmutableStorageScheme(),
+                                                table.getEncodingScheme(),
+                                                table.getEncodedCQCounter(),
+                                                
table.useStatsForParallelization(),
+                                                table.getBucketNum());
                                 TableRef indexTableRef = new 
TableRef(viewIndexTable);
                                 PName indexTableTenantId = 
sharedTableState.getTenantId();
                                 if (indexTableTenantId==null) {
@@ -4119,7 +4127,11 @@ public class MetaDataClient {
         connection.addSchema(result.getSchema());
     }
 
-    private void throwIfLastPKOfParentIsFixedLength(PTable parent, String 
viewSchemaName, String viewName, ColumnDef col) throws SQLException {
+    private void throwIfLastPKOfParentIsVariableLength(PTable parent, String 
viewSchemaName, String viewName, ColumnDef col) throws SQLException {
+        // if the last pk column is variable length then we read all the
+        // bytes of the rowkey without looking for a separator byte see
+        // 
https://issues.apache.org/jira/browse/PHOENIX-978?focusedCommentId=14617847&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-14617847
+        // so we cannot add a pk column to a view if the last pk column of the 
parent is variable length
         if (isLastPKVariableLength(parent)) {
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MODIFY_VIEW_PK)
             .setSchemaName(viewSchemaName)
@@ -4134,10 +4146,8 @@ public class MetaDataClient {
     }
 
     private PTable getParentOfView(PTable view) throws SQLException {
-        //TODO just use view.getParentName().getString() after implementing 
https://issues.apache.org/jira/browse/PHOENIX-2114
-        SelectStatement select = new 
SQLParser(view.getViewStatement()).parseQuery();
-        String parentName = 
SchemaUtil.normalizeFullTableName(select.getFrom().toString().trim());
-        return connection.getTable(new PTableKey(view.getTenantId(), 
parentName));
+        return connection
+                .getTable(new PTableKey(view.getTenantId(), 
view.getParentName().getString()));
     }
 
     public MutationState createSchema(CreateSchemaStatement create) throws 
SQLException {

Reply via email to