This is an automated email from the ASF dual-hosted git repository.

gokcen pushed a commit to branch 4.x-PHOENIX-6247
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-PHOENIX-6247 by this push:
     new 2279410  PHOENIX-6247 Separating logical and physical table names
2279410 is described below

commit 2279410c72f883619628ca81680a2d01207af9a3
Author: Gokcen Iskender <[email protected]>
AuthorDate: Thu Feb 25 20:05:05 2021 -0800

    PHOENIX-6247 Separating logical and physical table names
    
    Signed-off-by: Gokcen Iskender <[email protected]>
---
 .../apache/phoenix/end2end/CsvBulkLoadToolIT.java  |  78 +-
 .../phoenix/end2end/IndexScrutinyToolBaseIT.java   |  13 +-
 .../apache/phoenix/end2end/LogicalTableNameIT.java | 814 +++++++++++++++++++++
 .../java/org/apache/phoenix/end2end/ViewTTLIT.java |  24 +-
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  66 +-
 .../phoenix/iterate/BaseResultIterators.java       |   1 -
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |   3 +
 .../mapreduce/index/IndexScrutinyMapper.java       |   2 +-
 .../phoenix/query/ConnectionQueryServicesImpl.java |  31 +-
 .../org/apache/phoenix/query/QueryConstants.java   |   2 +
 .../org/apache/phoenix/schema/DelegateTable.java   |  10 +
 .../org/apache/phoenix/schema/MetaDataClient.java  |  97 ++-
 .../org/apache/phoenix/schema/PMetaDataImpl.java   |  13 +
 .../java/org/apache/phoenix/schema/PTable.java     |  15 +
 .../java/org/apache/phoenix/schema/PTableImpl.java | 299 +++++---
 .../org/apache/phoenix/schema/TableProperty.java   |   6 +
 .../java/org/apache/phoenix/util/MetaDataUtil.java |  19 +-
 .../java/org/apache/phoenix/util/SchemaUtil.java   |   4 +
 .../java/org/apache/phoenix/util/UpgradeUtil.java  |   4 +-
 .../java/org/apache/phoenix/util/ViewUtil.java     |   1 +
 phoenix-core/src/main/protobuf/PTable.proto        |   2 +
 21 files changed, 1344 insertions(+), 160 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 762d613..88e14ec 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
@@ -56,6 +58,7 @@ import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -443,7 +446,80 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
             checkIndexTableIsVerified(indexTableName);
         }
     }
-    
+
+    @Test
+    public void testImportWithDifferentPhysicalName() throws Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexTableName = String.format("%s_IDX", tableName);
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexTableName = SchemaUtil.getTableName(schemaName, 
indexTableName);
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE " + fullTableName + "(ID INTEGER NOT NULL 
PRIMARY KEY, "
+                + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+        String ddl = "CREATE  INDEX " + indexTableName + " ON " + 
fullTableName + "(FIRST_NAME ASC)";
+        stmt.execute(ddl);
+        String newTableName = LogicalTableNameIT.NEW_TABLE_PREFIX + 
generateUniqueName();
+        String fullNewTableName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+            String snapshotName = new 
StringBuilder(tableName).append("-Snapshot").toString();
+            admin.snapshot(snapshotName, TableName.valueOf(fullTableName));
+            admin.cloneSnapshot(Bytes.toBytes(snapshotName), 
Bytes.toBytes(fullNewTableName));
+        }
+        LogicalTableNameIT.renameAndDropPhysicalTable(conn, "NULL", 
schemaName, tableName, newTableName);
+
+        String csvName = "/tmp/input_logical_name.csv";
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path(csvName));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,FirstName 1,LastName 1");
+        printWriter.println("2,FirstName 2,LastName 2");
+        printWriter.close();
+
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(getUtility().getConfiguration());
+        int
+                exitCode =
+                csvBulkLoadTool
+                        .run(new String[] { "--input", csvName, "--table", 
tableName,
+                                "--schema", schemaName,
+                                "--zookeeper", zkQuorum });
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT /*+ NO_INDEX */ id, 
FIRST_NAME FROM " + fullTableName + " where first_name='FirstName 2'");
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("FirstName 2", rs.getString(2));
+        String selectFromIndex = "SELECT FIRST_NAME FROM " + fullTableName + " 
where FIRST_NAME='FirstName 1'";
+        rs = stmt.executeQuery("EXPLAIN " + selectFromIndex);
+        assertTrue(QueryUtil.getExplainPlan(rs).contains(indexTableName));
+        rs = stmt.executeQuery(selectFromIndex);
+        assertTrue(rs.next());
+        assertEquals("FirstName 1", rs.getString(1));
+
+        String csvNameForIndex = "/tmp/input_logical_name_index.csv";
+        // Now just run the tool on the index table and check that the index 
has extra row.
+        outputStream = fs.create(new Path(csvNameForIndex));
+        printWriter = new PrintWriter(outputStream);
+        printWriter.println("3,FirstName 3,LastName 3");
+        printWriter.close();
+        exitCode = csvBulkLoadTool
+                        .run(new String[] { "--input", csvNameForIndex, 
"--table", tableName,
+                                "--schema", schemaName,
+                                "--index-table", indexTableName, 
"--zookeeper", zkQuorum });
+        assertEquals(0, exitCode);
+        selectFromIndex = "SELECT FIRST_NAME FROM " + fullTableName + " where 
FIRST_NAME='FirstName 3'";
+        rs = stmt.executeQuery("EXPLAIN " + selectFromIndex);
+        assertTrue(QueryUtil.getExplainPlan(rs).contains(indexTableName));
+        rs = stmt.executeQuery(selectFromIndex);
+        assertTrue(rs.next());
+        assertEquals("FirstName 3", rs.getString(1));
+        rs.close();
+        stmt.close();
+
+        checkIndexTableIsVerified(fullIndexTableName);
+    }
+
     @Test
     public void testInvalidArguments() {
         String tableName = "TABLE8";
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
index bdefcfd..e0ea133 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
@@ -17,6 +17,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyMapper;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyMapperForTest;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
@@ -73,7 +74,7 @@ public class IndexScrutinyToolBaseIT extends BaseTest {
         return scrutiny.getJobs();
     }
 
-    protected String[] getArgValues(String schemaName, String dataTable, 
String indxTable, Long batchSize,
+    public String[] getArgValues(String schemaName, String dataTable, String 
indxTable, Long batchSize,
             SourceTable sourceTable, boolean outputInvalidRows, OutputFormat 
outputFormat, Long maxOutputRows, String tenantId, Long scrutinyTs) {
         final List<String> args = Lists.newArrayList();
         if (schemaName != null) {
@@ -129,6 +130,16 @@ public class IndexScrutinyToolBaseIT extends BaseTest {
         return args.toArray(new String[0]);
     }
 
+    public static List<Job> runScrutinyTool(String schemaName, String 
dataTableName, String indexTableName,
+            Long batchSize, SourceTable sourceTable) throws Exception {
+        IndexScrutinyToolBaseIT it = new IndexScrutinyToolBaseIT();
+        final String[]
+                cmdArgs =
+                it.getArgValues(schemaName, dataTableName, indexTableName, 
batchSize, sourceTable,
+                        false, null, null, null, Long.MAX_VALUE);
+        return it.runScrutiny(IndexScrutinyMapperForTest.class, cmdArgs);
+    }
+
     protected long getCounterValue(Counters counters, 
Enum<PhoenixScrutinyJobCounters> counter) {
         return counters.findCounter(counter).getValue();
     }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
new file mode 100644
index 0000000..15388a8
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
@@ -0,0 +1,814 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.curator.shaded.com.google.common.base.Joiner;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static java.util.Arrays.asList;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
+import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.MAX_ROWS;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(Parameterized.class)
+@Category(NeedsOwnMiniClusterTest.class)
+public class LogicalTableNameIT extends BaseTest {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogicalTableNameIT.class);
+
+    private final boolean createChildAfterTransform;
+    private final boolean immutable;
+    private String dataTableDdl;
+    public static final String NEW_TABLE_PREFIX = "NEW_TBL_";
+    private Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newConcurrentMap();
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString());
+        props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
Integer.toString(60*60*1000)); // An hour
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    public LogicalTableNameIT(boolean createChildAfterTransform, boolean 
immutable)  {
+        this.createChildAfterTransform = createChildAfterTransform;
+        this.immutable = immutable;
+        StringBuilder optionBuilder = new StringBuilder();
+        if (immutable) {
+            optionBuilder.append(" 
,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, IMMUTABLE_ROWS=true");
+        }
+        this.dataTableDdl = optionBuilder.toString();
+    }
+
+    @Parameterized.Parameters(
+            name = "createChildAfterTransform={0}, immutable={1}")
+    public static synchronized Collection<Object[]> data() {
+        List<Object[]> list = Lists.newArrayListWithExpectedSize(2);
+        boolean[] Booleans = new boolean[] { false, true };
+        for (boolean immutable : Booleans) {
+            for (boolean createAfter : Booleans) {
+                list.add(new Object[] { createAfter, immutable });
+            }
+        }
+
+        return list;
+    }
+
+    private Connection getConnection(Properties props) throws Exception {
+        props.setProperty(QueryServices.DROP_METADATA_ATTRIB, 
Boolean.toString(true));
+        // Force real driver to be used as the test one doesn't handle creating
+        // more than one ConnectionQueryService
+        props.setProperty(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
StringUtil.EMPTY_STRING);
+        // Create new ConnectionQueryServices so that we can set 
DROP_METADATA_ATTRIB
+        String url = QueryUtil.getConnectionUrl(props, config, "PRINCIPAL");
+        return DriverManager.getConnection(url, props);
+    }
+
+    private  HashMap<String, ArrayList<String>> 
testBaseTableWithIndex_BaseTableChange(Connection conn, Connection conn2, 
String schemaName,
+                                                                               
        String tableName, String indexName) throws Exception {
+        conn.setAutoCommit(true);
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        createTable(conn, fullTableName);
+        if (!createChildAfterTransform) {
+            createIndexOnTable(conn, fullTableName, indexName);
+        }
+        HashMap<String, ArrayList<String>> expected = populateTable(conn, 
fullTableName, 1, 2);
+
+        // Create another hbase table and add 1 more row
+        String newTableName =  NEW_TABLE_PREFIX + tableName;
+        String fullNewTableName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+            String snapshotName = new 
StringBuilder(fullTableName).append("-Snapshot").toString();
+            admin.snapshot(snapshotName, TableName.valueOf(fullTableName));
+            admin.cloneSnapshot(Bytes.toBytes(snapshotName), 
Bytes.toBytes(fullNewTableName));
+
+            try (HTableInterface htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullNewTableName)))
 {
+                Put put = new Put(ByteUtil.concat(Bytes.toBytes("PK3")));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES,
+                        QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V1"), Bytes.toBytes("V13"));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V2"),
+                        PInteger.INSTANCE.toBytes(3));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V3"),
+                        PInteger.INSTANCE.toBytes(4));
+                htable.put(put);
+                expected.put("PK3", Lists.newArrayList("PK3", "V13", "3", 
"4"));
+            }
+        }
+
+        // Query to cache on the second connection
+        String selectTable1 = "SELECT PK1, V1, V2, V3 FROM " + fullTableName + 
" ORDER BY PK1 DESC";
+        ResultSet rs1 = conn2.createStatement().executeQuery(selectTable1);
+        assertTrue(rs1.next());
+
+        // Rename table to point to the new hbase table
+        renameAndDropPhysicalTable(conn, "NULL", schemaName, tableName, 
newTableName);
+
+        if (createChildAfterTransform) {
+            createIndexOnTable(conn, fullTableName, indexName);
+        }
+
+        return expected;
+    }
+
+    @Test
+    public void testUpdatePhysicalTableNameWithIndex() throws Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                HashMap<String, ArrayList<String>> expected = 
testBaseTableWithIndex_BaseTableChange(conn, conn2, schemaName, tableName, 
indexName);
+
+                // We have to rebuild index for this to work
+                IndexToolIT.runIndexTool(true, false, schemaName, tableName, 
indexName);
+
+                validateTable(conn, fullTableName);
+                validateTable(conn2, fullTableName);
+                validateIndex(conn, fullIndexName, false, expected);
+                validateIndex(conn2, fullIndexName, false, expected);
+
+                // Add row and check
+                populateTable(conn, fullTableName, 10, 1);
+                ResultSet rs = conn2.createStatement().executeQuery("SELECT * 
FROM " + fullIndexName + " WHERE \":PK1\"='PK10'");
+                assertEquals(true, rs.next());
+                rs = conn.createStatement().executeQuery("SELECT * FROM " + 
fullTableName  + " WHERE PK1='PK10'");
+                assertEquals(true, rs.next());
+
+                // Drop row and check
+                conn.createStatement().execute("DELETE from " + fullTableName 
+ " WHERE PK1='PK10'");
+                rs = conn2.createStatement().executeQuery("SELECT * FROM " + 
fullIndexName + " WHERE \":PK1\"='PK10'");
+                assertEquals(false, rs.next());
+                rs = conn.createStatement().executeQuery("SELECT * FROM " + 
fullTableName  + " WHERE PK1='PK10'");
+                assertEquals(false, rs.next());
+
+                conn2.createStatement().execute("DROP TABLE " + fullTableName);
+                // check that the physical data table is dropped
+                Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                assertEquals(false, 
admin.tableExists(TableName.valueOf(SchemaUtil.getTableName(schemaName,NEW_TABLE_PREFIX
 + tableName))));
+
+                // check that index is dropped
+                assertEquals(false, 
admin.tableExists(TableName.valueOf(fullIndexName)));
+
+            }
+        }
+    }
+
+    @Test
+    public void testUpdatePhysicalTableNameWithIndex_runScrutiny() throws 
Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                testBaseTableWithIndex_BaseTableChange(conn, conn2, 
schemaName, tableName, indexName);
+
+                List<Job>
+                        completedJobs =
+                        IndexScrutinyToolBaseIT.runScrutinyTool(schemaName, 
tableName, indexName, 1L,
+                                
IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
+
+                Job job = completedJobs.get(0);
+                assertTrue(job.isSuccessful());
+
+                Counters counters = job.getCounters();
+                if (createChildAfterTransform) {
+                    assertEquals(3, 
counters.findCounter(VALID_ROW_COUNT).getValue());
+                    assertEquals(0, 
counters.findCounter(INVALID_ROW_COUNT).getValue());
+                } else {
+                    // Since we didn't build the index, we expect 1 missing 
index row
+                    assertEquals(2, 
counters.findCounter(VALID_ROW_COUNT).getValue());
+                    assertEquals(1, 
counters.findCounter(INVALID_ROW_COUNT).getValue());
+                }
+            }
+        }
+    }
+
+    private  HashMap<String, ArrayList<String>> 
test_IndexTableChange(Connection conn, Connection conn2, String schemaName, 
String tableName, String indexName, byte[] verifiedBytes) throws Exception {
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        conn.setAutoCommit(true);
+        createTable(conn, fullTableName);
+        createIndexOnTable(conn, fullTableName, indexName);
+        HashMap<String, ArrayList<String>> expected = populateTable(conn, 
fullTableName, 1, 2);
+
+        // Create another hbase table for index and add 1 more row
+        String newTableName = "NEW_IDXTBL_" + generateUniqueName();
+        String fullNewTableName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .getAdmin()) {
+            String snapshotName = new 
StringBuilder(indexName).append("-Snapshot").toString();
+            admin.snapshot(snapshotName, TableName.valueOf(fullIndexName));
+            admin.cloneSnapshot(Bytes.toBytes(snapshotName), 
Bytes.toBytes(fullNewTableName));
+
+            try (HTableInterface htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullNewTableName)))
 {
+                Put
+                        put =
+                        new Put(ByteUtil.concat(Bytes.toBytes("V13"), 
QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes("PK3")));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES,
+                        verifiedBytes);
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("0:V2"),
+                        PInteger.INSTANCE.toBytes(3));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("0:V3"),
+                        PInteger.INSTANCE.toBytes(4));
+                htable.put(put);
+                expected.put("PK3", Lists.newArrayList("PK3", "V13", "3", 
"4"));
+            }
+        }
+
+        // Query to cache on the second connection
+        String selectTable1 = "SELECT * FROM " + fullIndexName;
+        ResultSet rs1 = conn2.createStatement().executeQuery(selectTable1);
+        assertTrue(rs1.next());
+
+        // Rename table to point to the new hbase table
+        renameAndDropPhysicalTable(conn, "NULL", schemaName, indexName, 
newTableName);
+
+        return expected;
+    }
+
+    @Test
+    public void testUpdatePhysicalIndexTableName() throws Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                HashMap<String, ArrayList<String>> expected = 
test_IndexTableChange(conn, conn2, schemaName, tableName, indexName, 
IndexRegionObserver.VERIFIED_BYTES);
+
+                validateIndex(conn, fullIndexName, false, expected);
+                validateIndex(conn2, fullIndexName, false, expected);
+
+                // create another index and drop the first index and validate 
the second one
+                String indexName2 = "IDX2_" + generateUniqueName();
+                String fullIndexName2 = SchemaUtil.getTableName(schemaName, 
indexName2);
+                if (createChildAfterTransform) {
+                    createIndexOnTable(conn2, fullTableName, indexName2);
+                }
+                dropIndex(conn2, fullTableName, indexName);
+                if (!createChildAfterTransform) {
+                    createIndexOnTable(conn2, fullTableName, indexName2);
+                }
+                // The new index doesn't have the new row
+                expected.remove("PK3");
+                validateIndex(conn, fullIndexName2, false, expected);
+                validateIndex(conn2, fullIndexName2, false, expected);
+            }
+        }
+    }
+
+    @Test
+    public void testUpdatePhysicalIndexTableName_runScrutiny() throws 
Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                test_IndexTableChange(conn, conn2, schemaName, tableName, 
indexName, IndexRegionObserver.VERIFIED_BYTES);
+                List<Job>
+                        completedJobs =
+                        IndexScrutinyToolBaseIT.runScrutinyTool(schemaName, 
tableName, indexName, 1L,
+                                
IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE);
+
+                Job job = completedJobs.get(0);
+                assertTrue(job.isSuccessful());
+
+                Counters counters = job.getCounters();
+
+                // Since we didn't build the index, we expect 1 missing index 
row
+                assertEquals(2, 
counters.findCounter(VALID_ROW_COUNT).getValue());
+                assertEquals(1, 
counters.findCounter(INVALID_ROW_COUNT).getValue());
+
+                // Try with unverified bytes
+                String tableName2 = "TBL_" + generateUniqueName();
+                String indexName2 = "IDX_" + generateUniqueName();
+                test_IndexTableChange(conn, conn2, schemaName, tableName2, 
indexName2, IndexRegionObserver.UNVERIFIED_BYTES);
+
+                completedJobs =
+                        IndexScrutinyToolBaseIT.runScrutinyTool(schemaName, 
tableName2, indexName2, 1L,
+                                
IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE);
+
+                job = completedJobs.get(0);
+                assertTrue(job.isSuccessful());
+
+                counters = job.getCounters();
+
+                // Since we didn't build the index, we expect 1 missing index 
row
+                assertEquals(2, 
counters.findCounter(VALID_ROW_COUNT).getValue());
+                assertEquals(0, 
counters.findCounter(INVALID_ROW_COUNT).getValue());
+
+            }
+        }
+    }
+
+    private HashMap<String, ArrayList<String>> 
testWithViewsAndIndex_BaseTableChange(Connection conn, Connection conn2, String 
schemaName, String tableName, String viewName1, String v1_indexName1, String 
v1_indexName2, String viewName2, String v2_indexName1) throws Exception {
+        conn.setAutoCommit(true);
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullViewName1 = SchemaUtil.getTableName(schemaName, viewName1);
+        String fullViewName2 = SchemaUtil.getTableName(schemaName, viewName2);
+        createTable(conn, fullTableName);
+        HashMap<String, ArrayList<String>> expected = new HashMap<>();
+        if (!createChildAfterTransform) {
+            createViewAndIndex(conn, schemaName, tableName, viewName1, 
v1_indexName1);
+            createViewAndIndex(conn, schemaName, tableName, viewName1, 
v1_indexName2);
+            createViewAndIndex(conn, schemaName, tableName, viewName2, 
v2_indexName1);
+            expected.putAll(populateView(conn, fullViewName1, 1,2));
+            expected.putAll(populateView(conn, fullViewName2, 10,2));
+        }
+
+        // Create another hbase table and add 1 more row
+        String newTableName = "NEW_TBL_" + generateUniqueName();
+        String fullNewTableName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .getAdmin()) {
+            String snapshotName = new 
StringBuilder(fullTableName).append("-Snapshot").toString();
+            admin.snapshot(snapshotName, TableName.valueOf(fullTableName));
+            admin.cloneSnapshot(Bytes.toBytes(snapshotName), 
Bytes.toBytes(fullNewTableName));
+
+            try (HTableInterface htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullNewTableName)))
 {
+                Put put = new Put(ByteUtil.concat(Bytes.toBytes("PK3")));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES,
+                        QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V1"),
+                        Bytes.toBytes("V13"));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V2"),
+                        PInteger.INSTANCE.toBytes(3));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V3"),
+                        PInteger.INSTANCE.toBytes(4));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("VIEW_COL1"),
+                        Bytes.toBytes("VIEW_COL1_3"));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("VIEW_COL2"),
+                        Bytes.toBytes("VIEW_COL2_3"));
+                htable.put(put);
+                expected.put("PK3", Lists.newArrayList("PK3", "V13", "3", "4", 
"VIEW_COL1_3", "VIEW_COL2_3"));
+            }
+        }
+
+        // Query to cache on the second connection
+        String selectTable1 = "SELECT PK1, V1, V2, V3 FROM " + fullTableName + 
" ORDER BY PK1 DESC";
+        ResultSet rs1 = conn2.createStatement().executeQuery(selectTable1);
+        if (!createChildAfterTransform) {
+            assertTrue(rs1.next());
+        }
+
+        // Rename table to point to hbase table
+        renameAndDropPhysicalTable(conn, "NULL", schemaName, tableName, 
newTableName);
+
+        conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+        if (createChildAfterTransform) {
+            createViewAndIndex(conn, schemaName, tableName, viewName1, 
v1_indexName1);
+            createViewAndIndex(conn, schemaName, tableName, viewName1, 
v1_indexName2);
+            createViewAndIndex(conn, schemaName, tableName, viewName2, 
v2_indexName1);
+            expected.putAll(populateView(conn, fullViewName1, 1,2));
+            expected.putAll(populateView(conn, fullViewName2, 10,2));
+        }
+
+        return expected;
+    }
+
+
+    private PhoenixTestBuilder.SchemaBuilder createGlobalViewAndTenantView() 
throws Exception {
+        int numOfRows = 5;
+        PhoenixTestBuilder.SchemaBuilder.TableOptions tableOptions = 
PhoenixTestBuilder.SchemaBuilder.TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+        tableOptions.setTableProps(" MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0 
"+this.dataTableDdl);
+
+        PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions globalViewOptions = 
PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions.withDefaults();
+
+        PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions 
globalViewIndexOptions =
+                
PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        PhoenixTestBuilder.SchemaBuilder.TenantViewOptions tenantViewOptions = 
new PhoenixTestBuilder.SchemaBuilder.TenantViewOptions();
+        tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", 
"COL9"));
+        tenantViewOptions.setTenantViewColumnTypes(asList("CHAR(15)", 
"VARCHAR", "VARCHAR", "VARCHAR"));
+
+        PhoenixTestBuilder.SchemaBuilder.OtherOptions 
testCaseWhenAllCFMatchAndAllDefault = new 
PhoenixTestBuilder.SchemaBuilder.OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTenantViewCFs(Lists.newArrayList((String) null, null, 
null, null));
+
+        // Define the test schema.
+        PhoenixTestBuilder.SchemaBuilder schemaBuilder = null;
+        if (!createChildAfterTransform) {
+            schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+            
schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+                    .withGlobalViewIndexOptions(globalViewIndexOptions)
+                    .withTenantViewOptions(tenantViewOptions)
+                    
.withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+        }  else {
+            schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+            schemaBuilder.withTableOptions(tableOptions).build();
+        }
+
+        PTable table = schemaBuilder.getBaseTable();
+        String schemaName = table.getSchemaName().getString();
+        String tableName = table.getTableName().getString();
+        String newBaseTableName = "NEW_TBL_" + tableName;
+        String fullNewBaseTableName = SchemaUtil.getTableName(schemaName, 
newBaseTableName);
+        String fullTableName = table.getName().getString();
+
+        try (Connection conn = getConnection(props)) {
+
+            try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+                String snapshotName = new 
StringBuilder(fullTableName).append("-Snapshot").toString();
+                admin.snapshot(snapshotName, TableName.valueOf(fullTableName));
+                admin.cloneSnapshot(Bytes.toBytes(snapshotName), 
Bytes.toBytes(fullNewBaseTableName));
+            }
+
+            renameAndDropPhysicalTable(conn, null, schemaName, tableName, 
newBaseTableName);
+        }
+
+        // TODO: this still creates a new table.
+        if (createChildAfterTransform) {
+            schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+            schemaBuilder.withDataOptions(schemaBuilder.getDataOptions())
+                    .withTableOptions(tableOptions)
+                    .withGlobalViewOptions(globalViewOptions)
+                    .withGlobalViewIndexOptions(globalViewIndexOptions)
+                    .withTenantViewOptions(tenantViewOptions)
+                    
.withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+        }
+
+        // Define the test data.
+        PhoenixTestBuilder.DataSupplier dataSupplier = new 
PhoenixTestBuilder.DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String id = String.format(ViewTTLIT.ID_FMT, rowIndex);
+                String zid = String.format(ViewTTLIT.ZID_FMT, rowIndex);
+                String col4 = String.format(ViewTTLIT.COL4_FMT, rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col5 = String.format(ViewTTLIT.COL5_FMT, rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col6 = String.format(ViewTTLIT.COL6_FMT, rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col7 = String.format(ViewTTLIT.COL7_FMT, rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col8 = String.format(ViewTTLIT.COL8_FMT, rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col9 = String.format(ViewTTLIT.COL9_FMT, rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { id, zid, col4, col5, col6, col7, col8, 
col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        PhoenixTestBuilder.DataWriter dataWriter = new 
PhoenixTestBuilder.BasicDataWriter();
+        List<String> columns =
+                Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", 
"COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+
+        String tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions().getTenantId();
+
+        try (Connection tenantConnection = 
DriverManager.getConnection(tenantConnectUrl)) {
+            tenantConnection.setAutoCommit(true);
+            dataWriter.setConnection(tenantConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+            dataWriter.upsertRows(1, numOfRows);
+            com.google.common.collect.Table<String, String, Object> 
upsertedData = dataWriter.getDataTable();;
+
+            PhoenixTestBuilder.DataReader dataReader = new 
PhoenixTestBuilder.BasicDataReader();
+            dataReader.setValidationColumns(columns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT %s from %s", 
Joiner.on(",").join(columns),
+                    schemaBuilder.getEntityTenantViewName()));
+            
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+            dataReader.setConnection(tenantConnection);
+            dataReader.readRows();
+            com.google.common.collect.Table<String, String, Object> fetchedData
+                    = dataReader.getDataTable();
+            assertNotNull("Fetched data should not be null", fetchedData);
+            ViewTTLIT.verifyRowsBeforeTTLExpiration(upsertedData, fetchedData);
+
+        }
+        return schemaBuilder;
+    }
+
+    @Test
+    public void testWith2LevelViewsBaseTablePhysicalNameChange() throws 
Exception {
+        // TODO: use namespace in one of the cases
+        PhoenixTestBuilder.SchemaBuilder schemaBuilder = 
createGlobalViewAndTenantView();
+    }
+
+    @Test
+    public void testUpdatePhysicalTableNameWithViews() throws Exception {
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                String schemaName = "S_" + generateUniqueName();
+                String tableName = "TBL_" + generateUniqueName();
+                String view1Name = "VW1_" + generateUniqueName();
+                String view1IndexName1 = "VW1IDX1_" + generateUniqueName();
+                String view1IndexName2 = "VW1IDX2_" + generateUniqueName();
+                String fullView1IndexName1 = 
SchemaUtil.getTableName(schemaName, view1IndexName1);
+                String fullView1IndexName2 =  
SchemaUtil.getTableName(schemaName, view1IndexName2);
+                String view2Name = "VW2_" + generateUniqueName();
+                String view2IndexName1 = "VW2IDX1_" + generateUniqueName();
+                String fullView1Name = SchemaUtil.getTableName(schemaName, 
view1Name);
+                String fullView2Name = SchemaUtil.getTableName(schemaName, 
view2Name);
+                String fullView2IndexName1 =  
SchemaUtil.getTableName(schemaName, view2IndexName1);
+
+                HashMap<String, ArrayList<String>> expected = 
testWithViewsAndIndex_BaseTableChange(conn, conn2, schemaName, tableName, 
view1Name, view1IndexName1, view1IndexName2, view2Name, view2IndexName1);
+
+                // We have to rebuild index for this to work
+                IndexToolIT.runIndexTool(true, false, schemaName, view1Name, 
view1IndexName1);
+                IndexToolIT.runIndexTool(true, false, schemaName, view1Name, 
view1IndexName2);
+                IndexToolIT.runIndexTool(true, false, schemaName, view2Name, 
view2IndexName1);
+
+                validateIndex(conn, fullView1IndexName1, true, expected);
+                validateIndex(conn2, fullView1IndexName2, true, expected);
+
+                // Add row and check
+                populateView(conn, fullView2Name, 20, 1);
+                ResultSet rs = conn2.createStatement().executeQuery("SELECT * 
FROM " + fullView2IndexName1 + " WHERE \":PK1\"='PK20'");
+                assertEquals(true, rs.next());
+                rs = conn.createStatement().executeQuery("SELECT * FROM " + 
fullView2Name  + " WHERE PK1='PK20'");
+                assertEquals(true, rs.next());
+
+                // Drop row and check
+                conn.createStatement().execute("DELETE from " + fullView2Name 
+ " WHERE PK1='PK20'");
+                rs = conn2.createStatement().executeQuery("SELECT * FROM " + 
fullView2IndexName1 + " WHERE \":PK1\"='PK20'");
+                assertEquals(false, rs.next());
+                rs = conn.createStatement().executeQuery("SELECT * FROM " + 
fullView2Name  + " WHERE PK1='PK20'");
+                assertEquals(false, rs.next());
+
+                conn2.createStatement().execute("DROP VIEW " + fullView2Name);
+                // check that this view is dropped but the other is there
+                rs = conn.createStatement().executeQuery("SELECT * FROM " + 
fullView1Name);
+                assertEquals(true, rs.next());
+                boolean failed = true;
+                try (ResultSet rsFail = 
conn.createStatement().executeQuery("SELECT * FROM " + fullView2Name)) {
+                    rsFail.next();
+                    failed = false;
+                } catch (SQLException e){
+
+                }
+                assertEquals(true, failed);
+
+                // check that first index is there but second index is dropped
+                rs = conn2.createStatement().executeQuery("SELECT * FROM " + 
fullView1IndexName1);
+                assertEquals(true, rs.next());
+                try {
+                    rs = conn.createStatement().executeQuery("SELECT * FROM " 
+ fullView2IndexName1);
+                    rs.next();
+                    failed = false;
+                } catch (SQLException e){
+
+                }
+                assertEquals(true, failed);
+            }
+        }
+    }
+
+    @Test
+    public void testUpdatePhysicalTableNameWithViews_runScrutiny() throws 
Exception {
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                String schemaName = "S_" + generateUniqueName();
+                String tableName = "TBL_" + generateUniqueName();
+                String view1Name = "VW1_" + generateUniqueName();
+                String view1IndexName1 = "VW1IDX1_" + generateUniqueName();
+                String view1IndexName2 = "VW1IDX2_" + generateUniqueName();
+                String view2Name = "VW2_" + generateUniqueName();
+                String view2IndexName1 = "VW2IDX1_" + generateUniqueName();
+
+                testWithViewsAndIndex_BaseTableChange(conn, conn2,schemaName, 
tableName, view1Name,
+                        view1IndexName1, view1IndexName2, view2Name, 
view2IndexName1);
+
+                List<Job>
+                        completedJobs =
+                        IndexScrutinyToolBaseIT.runScrutinyTool(schemaName, 
view2Name, view2IndexName1, 1L,
+                                
IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
+
+                Job job = completedJobs.get(0);
+                assertTrue(job.isSuccessful());
+
+                Counters counters = job.getCounters();
+                if (createChildAfterTransform) {
+                    assertEquals(3, 
counters.findCounter(VALID_ROW_COUNT).getValue());
+                    assertEquals(2, 
counters.findCounter(INVALID_ROW_COUNT).getValue());
+                } else {
+                    // Since we didn't build the index, we expect 1 missing 
index row and 2 are from the other index
+                    assertEquals(2, 
counters.findCounter(VALID_ROW_COUNT).getValue());
+                    assertEquals(3, 
counters.findCounter(INVALID_ROW_COUNT).getValue());
+                }
+
+            }
+        }
+    }
+
+    private void createTable(Connection conn, String tableName) throws 
Exception {
+        String createTableSql = "CREATE TABLE " + tableName + " (PK1 VARCHAR 
NOT NULL, V1 VARCHAR, V2 INTEGER, V3 INTEGER "
+                + "CONSTRAINT NAME_PK PRIMARY KEY(PK1)) COLUMN_ENCODED_BYTES=0 
" + dataTableDdl;
+        LOGGER.debug(createTableSql);
+        conn.createStatement().execute(createTableSql);
+    }
+
+    private void createIndexOnTable(Connection conn, String tableName, String 
indexName)
+            throws SQLException {
+        String createIndexSql = "CREATE INDEX " + indexName + " ON " + 
tableName + " (V1) INCLUDE (V2, V3) ";
+        LOGGER.debug(createIndexSql);
+        conn.createStatement().execute(createIndexSql);
+    }
+
+    private void dropIndex(Connection conn, String tableName, String indexName)
+            throws SQLException {
+        String sql = "DROP INDEX " + indexName + " ON " + tableName ;
+        conn.createStatement().execute(sql);
+    }
+
+    private HashMap<String, ArrayList<String>> populateTable(Connection conn, 
String tableName, int startnum, int numOfRows)
+            throws SQLException {
+        String upsert = "UPSERT INTO " + tableName + " (PK1, V1,  V2, V3) 
VALUES (?,?,?,?)";
+        PreparedStatement upsertStmt = conn.prepareStatement(upsert);
+        HashMap<String, ArrayList<String>> result = new HashMap<>();
+        for (int i=startnum; i < startnum + numOfRows; i++) {
+            ArrayList<String> row = new ArrayList<>();
+            upsertStmt.setString(1, "PK" + i);
+            row.add("PK" + i);
+            upsertStmt.setString(2, "V1" + i);
+            row.add("V1" + i);
+            upsertStmt.setInt(3, i);
+            row.add(String.valueOf(i));
+            upsertStmt.setInt(4, i + 1);
+            row.add(String.valueOf(i + 1));
+            upsertStmt.executeUpdate();
+            result.put("PK" + i, row);
+        }
+        return result;
+    }
+
+    private HashMap<String, ArrayList<String>> populateView(Connection conn, 
String viewName, int startNum, int numOfRows) throws SQLException {
+        String upsert = "UPSERT INTO " + viewName + " (PK1, V1,  V2, V3, 
VIEW_COL1, VIEW_COL2) VALUES (?,?,?,?,?,?)";
+        PreparedStatement upsertStmt = conn.prepareStatement(upsert);
+        HashMap<String, ArrayList<String>> result = new HashMap<>();
+        for (int i=startNum; i < startNum + numOfRows; i++) {
+            ArrayList<String> row = new ArrayList<>();
+            upsertStmt.setString(1, "PK"+i);
+            row.add("PK"+i);
+            upsertStmt.setString(2, "V1"+i);
+            row.add("V1"+i);
+            upsertStmt.setInt(3, i);
+            row.add(String.valueOf(i));
+            upsertStmt.setInt(4, i+1);
+            row.add(String.valueOf(i+1));
+            upsertStmt.setString(5, "VIEW_COL1_"+i);
+            row.add("VIEW_COL1_"+i);
+            upsertStmt.setString(6, "VIEW_COL2_"+i);
+            row.add("VIEW_COL2_"+i);
+            upsertStmt.executeUpdate();
+            result.put("PK"+i, row);
+        }
+        return result;
+    }
+
+    private void createViewAndIndex(Connection conn, String schemaName, String 
tableName, String viewName, String viewIndexName)
+            throws SQLException {
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+        String
+                view1DDL =
+                "CREATE VIEW IF NOT EXISTS " + fullViewName + " ( VIEW_COL1 
VARCHAR, VIEW_COL2 VARCHAR) AS SELECT * FROM "
+                        + fullTableName;
+        conn.createStatement().execute(view1DDL);
+        String indexDDL = "CREATE INDEX IF NOT EXISTS " + viewIndexName + " ON 
" + fullViewName + " (V1) include (V2, V3, VIEW_COL2) ";
+        conn.createStatement().execute(indexDDL);
+        conn.commit();
+    }
+
+    private void validateTable(Connection connection, String tableName) throws 
SQLException {
+        String selectTable = "SELECT PK1, V1, V2, V3 FROM " + tableName + " 
ORDER BY PK1 DESC";
+        ResultSet rs = connection.createStatement().executeQuery(selectTable);
+        assertTrue(rs.next());
+        assertEquals("PK3", rs.getString(1));
+        assertEquals("V13", rs.getString(2));
+        assertEquals(3, rs.getInt(3));
+        assertEquals(4, rs.getInt(4));
+        assertTrue(rs.next());
+        assertEquals("PK2", rs.getString(1));
+        assertEquals("V12", rs.getString(2));
+        assertEquals(2, rs.getInt(3));
+        assertEquals(3, rs.getInt(4));
+        assertTrue(rs.next());
+        assertEquals("PK1", rs.getString(1));
+        assertEquals("V11", rs.getString(2));
+        assertEquals(1, rs.getInt(3));
+        assertEquals(2, rs.getInt(4));
+    }
+
+    private void validateIndex(Connection connection, String tableName, 
boolean isViewIndex, HashMap<String, ArrayList<String>> expected) throws 
SQLException {
+        String selectTable = "SELECT * FROM " + tableName;
+        ResultSet rs = connection.createStatement().executeQuery(selectTable);
+        int cnt = 0;
+        while (rs.next()) {
+            String pk = rs.getString(2);
+            assertTrue(expected.containsKey(pk));
+            ArrayList<String> row = expected.get(pk);
+            assertEquals(row.get(1), rs.getString(1));
+            assertEquals(row.get(2), rs.getString(3));
+            assertEquals(row.get(3), rs.getString(4));
+            if (isViewIndex) {
+                assertEquals(row.get(5), rs.getString(5));
+            }
+            cnt++;
+        }
+        assertEquals(cnt, expected.size());
+    }
+
+    public static void renameAndDropPhysicalTable(Connection conn, String 
tenantId, String schema, String tableName, String physicalName) throws 
Exception {
+        String
+                changeName =
+                String.format(
+                        "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, 
TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, PHYSICAL_TABLE_NAME) VALUES (%s, '%s', 
'%s', NULL, NULL, '%s')",
+                        tenantId, schema, tableName, physicalName);
+        conn.createStatement().execute(changeName);
+        conn.commit();
+
+        String fullTableName = SchemaUtil.getTableName(schema, tableName);
+        Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+        TableName hTableName = TableName.valueOf(fullTableName);
+        admin.disableTable(hTableName);
+        admin.deleteTable(hTableName);
+        conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .clearCache();
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
index 1fd6395..5284183 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
@@ -97,8 +97,8 @@ import static org.junit.Assert.fail;
 public class ViewTTLIT extends ParallelStatsDisabledIT {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ViewTTLIT.class);
     private static final String ORG_ID_FMT = "00D0x000%s";
-    private static final String ID_FMT = "00A0y000%07d";
-    private static final String ZID_FMT = "00B0y000%07d";
+    static final String ID_FMT = "00A0y000%07d";
+    static final String ZID_FMT = "00B0y000%07d";
     private static final String PHOENIX_TTL_HEADER_SQL = "SELECT PHOENIX_TTL 
FROM SYSTEM.CATALOG "
             + "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND 
TABLE_TYPE = '%s'";
 
@@ -109,15 +109,15 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
             = "ALTER VIEW \"%s\".\"%s\" ADD IF NOT EXISTS %s CHAR(10)";
     private static final int DEFAULT_NUM_ROWS = 5;
 
-    private static final String COL1_FMT = "a%05d";
-    private static final String COL2_FMT = "b%05d";
-    private static final String COL3_FMT = "c%05d";
-    private static final String COL4_FMT = "d%05d";
-    private static final String COL5_FMT = "e%05d";
-    private static final String COL6_FMT = "f%05d";
-    private static final String COL7_FMT = "g%05d";
-    private static final String COL8_FMT = "h%05d";
-    private static final String COL9_FMT = "i%05d";
+    static final String COL1_FMT = "a%05d";
+    static final String COL2_FMT = "b%05d";
+    static final String COL3_FMT = "c%05d";
+    static final String COL4_FMT = "d%05d";
+    static final String COL5_FMT = "e%05d";
+    static final String COL6_FMT = "f%05d";
+    static final String COL7_FMT = "g%05d";
+    static final String COL8_FMT = "h%05d";
+    static final String COL9_FMT = "i%05d";
 
     // Scans the HBase rows directly and asserts
     private void assertUsingHBaseRows(byte[] hbaseTableName,
@@ -2441,7 +2441,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         }
     }
 
-    private void verifyRowsBeforeTTLExpiration(
+    static void verifyRowsBeforeTTLExpiration(
             com.google.common.collect.Table<String, String, Object> 
upsertedData,
             com.google.common.collect.Table<String, String, Object> 
fetchedData) {
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 900eec7..05f2f93 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -55,6 +55,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT_BYTES
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES;
@@ -321,6 +322,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
     private static final Cell ROW_KEY_ORDER_OPTIMIZABLE_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
     private static final Cell TRANSACTIONAL_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
TRANSACTIONAL_BYTES);
     private static final Cell TRANSACTION_PROVIDER_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
TRANSACTION_PROVIDER_BYTES);
+    private static final Cell PHYSICAL_TABLE_NAME_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
PHYSICAL_TABLE_NAME_BYTES);
     private static final Cell UPDATE_CACHE_FREQUENCY_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
UPDATE_CACHE_FREQUENCY_BYTES);
     private static final Cell IS_NAMESPACE_MAPPED_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
             TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
@@ -361,6 +363,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             ROW_KEY_ORDER_OPTIMIZABLE_KV,
             TRANSACTIONAL_KV,
             TRANSACTION_PROVIDER_KV,
+            PHYSICAL_TABLE_NAME_KV,
             UPDATE_CACHE_FREQUENCY_KV,
             IS_NAMESPACE_MAPPED_KV,
             AUTO_PARTITION_SEQ_KV,
@@ -407,6 +410,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
     private static final int STORAGE_SCHEME_INDEX = 
TABLE_KV_COLUMNS.indexOf(STORAGE_SCHEME_KV);
     private static final int QUALIFIER_ENCODING_SCHEME_INDEX = 
TABLE_KV_COLUMNS.indexOf(ENCODING_SCHEME_KV);
     private static final int USE_STATS_FOR_PARALLELIZATION_INDEX = 
TABLE_KV_COLUMNS.indexOf(USE_STATS_FOR_PARALLELIZATION_KV);
+    private static final int PHYSICAL_TABLE_NAME_INDEX = 
TABLE_KV_COLUMNS.indexOf(PHYSICAL_TABLE_NAME_KV);
     private static final int PHOENIX_TTL_INDEX = 
TABLE_KV_COLUMNS.indexOf(PHOENIX_TTL_KV);
     private static final int PHOENIX_TTL_HWM_INDEX = 
TABLE_KV_COLUMNS.indexOf(PHOENIX_TTL_HWM_KV);
     private static final int LAST_DDL_TIMESTAMP_INDEX =
@@ -958,6 +962,17 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         arguments.add(arg);
     }
 
+    private PTable getTable(byte[] tenantId, byte[] schemaName, byte[] 
tableName, long clientTimeStamp, int clientVersion)
+            throws IOException, SQLException {
+        byte[] tableKey = SchemaUtil.getTableKey(tenantId, schemaName, 
tableName);
+        ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
+        // Get as of latest timestamp so we can detect if we have a newer 
table that already
+        // exists without making an additional query
+        PTable table = loadTable(env, tableKey, cacheKey, clientTimeStamp, 
HConstants.LATEST_TIMESTAMP,
+                clientVersion);
+        return table;
+    }
+
     private PTable getTable(RegionScanner scanner, long clientTimeStamp, long 
tableTimeStamp,
                             int clientVersion)
             throws IOException, SQLException {
@@ -1060,6 +1075,11 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         PName dataTableName =
                 dataTableNameKv != null ? 
newPName(dataTableNameKv.getValueArray(),
                         dataTableNameKv.getValueOffset(), 
dataTableNameKv.getValueLength()) : null;
+        Cell physicalTableNameKv = tableKeyValues[PHYSICAL_TABLE_NAME_INDEX];
+        PName physicalTableName =
+                physicalTableNameKv != null ? 
newPName(physicalTableNameKv.getValueArray(),
+                        physicalTableNameKv.getValueOffset(), 
physicalTableNameKv.getValueLength()) : null;
+
         Cell indexStateKv = tableKeyValues[INDEX_STATE_INDEX];
         PIndexState indexState =
                 indexStateKv == null ? null : 
PIndexState.fromSerializedValue(indexStateKv
@@ -1186,6 +1206,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         List<PName> physicalTables = Lists.newArrayList();
         PName parentTableName = tableType == INDEX ? dataTableName : null;
         PName parentSchemaName = tableType == INDEX ? schemaName : null;
+        PName parentLogicalName = null;
         EncodedCQCounter cqCounter =
                 (!EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme) || 
tableType == PTableType.VIEW) ? PTable.EncodedCQCounter.NULL_COUNTER
                         : new EncodedCQCounter();
@@ -1209,7 +1230,30 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 if (linkType == LinkType.INDEX_TABLE) {
                     addIndexToTable(tenantId, schemaName, famName, tableName, 
clientTimeStamp, indexes, clientVersion);
                 } else if (linkType == LinkType.PHYSICAL_TABLE) {
-                    physicalTables.add(famName);
+                    // famName contains the logical name of the parent table. 
We need to get the actual physical name of the table
+                    PTable parentTable = null;
+                    if (indexType != IndexType.LOCAL) {
+                        parentTable = getTable(null, 
SchemaUtil.getSchemaNameFromFullName(famName.getBytes()).getBytes(),
+                                
SchemaUtil.getTableNameFromFullName(famName.getBytes()).getBytes(), 
clientTimeStamp, clientVersion);
+                        if (parentTable == null) {
+                            // parentTable is not in the cache. Since famName 
is only logical name, we need to find the physical table.
+                            try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class))
 {
+                                parentTable = 
PhoenixRuntime.getTableNoCache(connection, famName.getString());
+                            } catch (TableNotFoundException e) {
+                                // It is ok to swallow this exception since 
this could be a view index and _IDX_ table is not there.
+                            }
+                        }
+                    }
+
+                    if (parentTable == null) {
+                        physicalTables.add(famName);
+                        // If this is a view index, then one of the link is 
IDX_VW -> _IDX_ PhysicalTable link. Since famName is _IDX_ and we can't get 
this table hence it is null, we need to use actual view name
+                        parentLogicalName = (tableType == INDEX ? 
SchemaUtil.getTableName(parentSchemaName, parentTableName) : famName);
+                    } else {
+                        String parentPhysicalTableName = 
parentTable.getPhysicalName().getString();
+                        
physicalTables.add(PNameFactory.newName(parentPhysicalTableName));
+                        parentLogicalName = 
SchemaUtil.getTableName(parentTable.getSchemaName(), 
parentTable.getTableName());
+                    }
                 } else if (linkType == LinkType.PARENT_TABLE) {
                     parentTableName = 
PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes()));
                     parentSchemaName = 
PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes()));
@@ -1256,6 +1300,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 .setTenantId(tenantId)
                 .setSchemaName(schemaName)
                 .setTableName(tableName)
+                .setPhysicalTableName(physicalTableName)
                 .setPkName(pkName)
                 .setDefaultFamilyName(defaultFamilyName)
                 .setRowKeyOrderOptimizable(rowKeyOrderOptimizable)
@@ -1263,6 +1308,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 .setIndexes(indexes == null ? Collections.<PTable>emptyList() 
: indexes)
                 .setParentSchemaName(parentSchemaName)
                 .setParentTableName(parentTableName)
+                .setBaseTableLogicalName(parentLogicalName)
                 .setPhysicalNames(physicalTables == null ?
                         ImmutableList.<PName>of() : 
ImmutableList.copyOf(physicalTables))
                 
.setViewModifiedUpdateCacheFrequency(viewModifiedUpdateCacheFrequency)
@@ -1877,7 +1923,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     cPhysicalName = parentTable.getPhysicalName().getBytes();
                     cParentPhysicalName = 
parentTable.getPhysicalName().getBytes();
                 } else if (parentTable.getType() == PTableType.VIEW) {
-                    cPhysicalName = 
MetaDataUtil.getViewIndexPhysicalName(parentTable.getPhysicalName().getBytes());
+                    // The view index physical table name is constructed from 
logical name of base table.
+                    // For example, _IDX_SC.TBL1 is the view index name and 
SC.TBL1 is the logical name of the base table.
+                    String namepaceMappedParentLogicalName = 
MetaDataUtil.getNamespaceMappedName(parentTable.getBaseTableLogicalName(), 
isNamespaceMapped);
+                    cPhysicalName = 
MetaDataUtil.getViewIndexPhysicalName(namepaceMappedParentLogicalName.getBytes());
                     cParentPhysicalName = 
parentTable.getPhysicalName().getBytes();
                 } else {
                     cParentPhysicalName = SchemaUtil
@@ -2013,7 +2062,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     String tenantIdStr = tenantIdBytes.length == 0 ? null : 
Bytes.toString(tenantIdBytes);
                     try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class))
 {
                         PName physicalName = parentTable.getPhysicalName();
-                        long seqValue = getViewIndexSequenceValue(connection, 
tenantIdStr, parentTable, physicalName);
+                        long seqValue = getViewIndexSequenceValue(connection, 
tenantIdStr, parentTable);
                         Put tableHeaderPut = 
MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
                         NavigableMap<byte[], List<Cell>> familyCellMap = 
tableHeaderPut.getFamilyCellMap();
                         List<Cell> cells = 
familyCellMap.get(TABLE_FAMILY_BYTES);
@@ -2152,10 +2201,15 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         }
     }
 
-    private long getViewIndexSequenceValue(PhoenixConnection connection, 
String tenantIdStr, PTable parentTable, PName physicalName) throws SQLException 
{
+    private long getViewIndexSequenceValue(PhoenixConnection connection, 
String tenantIdStr, PTable parentTable) throws SQLException {
         int nSequenceSaltBuckets = 
connection.getQueryServices().getSequenceSaltBuckets();
-
-        SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, 
physicalName,
+        // parentTable is parent of the view index which is the view. View 
table name is _IDX_+logical name of base table
+        // Since parent is the view, the parentTable.getBaseTableLogicalName() 
returns the logical full name of the base table
+        PName parentName = parentTable.getBaseTableLogicalName();
+        if (parentName == null) {
+            parentName = 
SchemaUtil.getPhysicalHBaseTableName(parentTable.getSchemaName(), 
parentTable.getTableName(), parentTable.isNamespaceMapped());
+        }
+        SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, 
parentName,
                 nSequenceSaltBuckets, parentTable.isNamespaceMapped());
         // Earlier sequence was created at (SCN-1/LATEST_TIMESTAMP) and 
incremented at the client max(SCN,dataTable.getTimestamp), but it seems we 
should
         // use always LATEST_TIMESTAMP to avoid seeing wrong sequence values 
by different connection having SCN
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index df918e5..a7abfa3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -59,7 +59,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 00babd4..b8ed43c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -321,6 +321,9 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final String TRANSACTION_PROVIDER = "TRANSACTION_PROVIDER";
     public static final byte[] TRANSACTION_PROVIDER_BYTES = 
Bytes.toBytes(TRANSACTION_PROVIDER);
 
+    public static final String PHYSICAL_TABLE_NAME = "PHYSICAL_TABLE_NAME";
+    public static final byte[] PHYSICAL_TABLE_NAME_BYTES = 
Bytes.toBytes(PHYSICAL_TABLE_NAME);
+
     public static final String UPDATE_CACHE_FREQUENCY = 
"UPDATE_CACHE_FREQUENCY";
     public static final byte[] UPDATE_CACHE_FREQUENCY_BYTES = 
Bytes.toBytes(UPDATE_CACHE_FREQUENCY);
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index af38aa4..7706be4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -351,8 +351,8 @@ public class IndexScrutinyMapper extends 
Mapper<NullWritable, PhoenixIndexDBWrit
             // i.e. without _IDX_ and with _IDX_ respectively
             physicalTable = sourcePhysicalName;
         } else {
-            table = pSourceTable.getTableName().toString();
             schema = pSourceTable.getSchemaName().toString();
+            table =  
SchemaUtil.getTableNameFromFullName(pSourceTable.getPhysicalName().getString());
             physicalTable = SchemaUtil
                     .getPhysicalHBaseTableName(schema, table, 
isNamespaceEnabled).toString();
         }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index ea8aca7..1004b9b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HColumnDescriptor.TTL;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
+import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
@@ -867,7 +868,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
 
-    private HTableDescriptor generateTableDescriptor(byte[] physicalTableName, 
HTableDescriptor existingDesc,
+    private HTableDescriptor generateTableDescriptor(byte[] physicalTableName, 
byte[] parentPhysicalTableName, HTableDescriptor existingDesc,
             PTableType tableType, Map<String, Object> tableProps, 
List<Pair<byte[], Map<String, Object>>> families,
             byte[][] splits, boolean isNamespaceMapped) throws SQLException {
         String defaultFamilyName = 
(String)tableProps.remove(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME);
@@ -883,7 +884,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             if (MetaDataUtil.isViewIndex(Bytes.toString(physicalTableName))) {
                 // Handles indexes created on views for single-tenant tables 
and
                 // global indexes created on views of multi-tenant tables
-                baseTableDesc = 
this.getTableDescriptor(Bytes.toBytes(MetaDataUtil.getViewIndexUserTableName(Bytes.toString(physicalTableName))));
+                baseTableDesc = 
this.getTableDescriptor(parentPhysicalTableName);
             } else if (existingDesc == null) {
                 // Global/local index creation on top of a physical base table
                 baseTableDesc = 
this.getTableDescriptor(SchemaUtil.getPhysicalTableName(
@@ -1301,7 +1302,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
      * @return true if table was created and false if it already exists
      * @throws SQLException
      */
-    private HTableDescriptor ensureTableCreated(byte[] physicalTableName, 
PTableType tableType, Map<String, Object> props,
+    private HTableDescriptor ensureTableCreated(byte[] physicalTableName, 
byte[] parentPhysicalTableName, PTableType tableType, Map<String, Object> props,
             List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, 
boolean modifyExistingMetaData,
             boolean isNamespaceMapped, boolean isDoNotUpgradePropSet) throws 
SQLException {
         SQLException sqlE = null;
@@ -1393,7 +1394,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 }
             }
 
-            HTableDescriptor newDesc = 
generateTableDescriptor(physicalTableName, existingDesc, tableType, props, 
families,
+            HTableDescriptor newDesc = 
generateTableDescriptor(physicalTableName, parentPhysicalTableName, 
existingDesc, tableType, props, families,
                     splits, isNamespaceMapped);
 
             if (!tableExist) {
@@ -1790,13 +1791,13 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     // Our property values are translated using toString, so we need to 
"string-ify" this.
     private static final String TRUE_BYTES_AS_STRING = 
Bytes.toString(PDataType.TRUE_BYTES);
 
-    private void ensureViewIndexTableCreated(byte[] physicalTableName, 
Map<String, Object> tableProps,
+    private void ensureViewIndexTableCreated(byte[] physicalTableName, byte[] 
parentPhysicalTableName, Map<String, Object> tableProps,
             List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, 
long timestamp,
             boolean isNamespaceMapped) throws SQLException {
         byte[] physicalIndexName = 
MetaDataUtil.getViewIndexPhysicalName(physicalTableName);
 
         tableProps.put(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_NAME, 
TRUE_BYTES_AS_STRING);
-        HTableDescriptor desc = ensureTableCreated(physicalIndexName, 
PTableType.TABLE, tableProps, families, splits,
+        HTableDescriptor desc = ensureTableCreated(physicalIndexName, 
parentPhysicalTableName, PTableType.TABLE, tableProps, families, splits,
                 true, isNamespaceMapped, false);
         if (desc != null) {
             if 
(!Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_BYTES))))
 {
@@ -1897,7 +1898,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 (tableType != PTableType.VIEW && (physicalTableName == null || 
localIndexTable))) {
             // For views this will ensure that metadata already exists
             // For tables and indexes, this will create the metadata if it 
doesn't already exist
-            ensureTableCreated(physicalTableNameBytes, tableType, tableProps, 
families, splits, true,
+            ensureTableCreated(physicalTableNameBytes, null, tableType, 
tableProps, families, splits, true,
                     isNamespaceMapped, isDoNotUpgradePropSet);
         }
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
@@ -1906,6 +1907,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             // TODO: if viewIndexId is Short.MIN_VALUE, then we don't need to 
attempt to create it
             if (physicalTableName != null) {
                 if (!localIndexTable && !MetaDataUtil.isMultiTenant(m, 
kvBuilder, ptr)) {
+                    // For view index, the physical table name is _IDX_+ 
logical table name format
                     ensureViewIndexTableCreated(tenantIdBytes.length == 0 ? 
null : PNameFactory.newName(tenantIdBytes),
                             physicalTableName, 
MetaDataUtil.getClientTimeStamp(m), isNamespaceMapped);
                 }
@@ -1931,7 +1933,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     Collections.<String, Object>emptyMap()));
             }
             ensureViewIndexTableCreated(
-                physicalTableNameBytes, tableProps, familiesPlusDefault,
+                physicalTableNameBytes, physicalTableNameBytes, tableProps, 
familiesPlusDefault,
                     MetaDataUtil.isSalted(m, kvBuilder, ptr) ? splits : null,
                 MetaDataUtil.getClientTimeStamp(m), isNamespaceMapped);
         }
@@ -2243,7 +2245,10 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         Map<String,Object> tableProps = 
createPropertiesMap(htableDesc.getValues());
         tableProps.put(PhoenixDatabaseMetaData.TRANSACTIONAL, 
table.isTransactional());
         tableProps.put(PhoenixDatabaseMetaData.IMMUTABLE_ROWS, 
table.isImmutableRows());
-        ensureViewIndexTableCreated(physicalTableName, tableProps, families, 
splits, timestamp, isNamespaceMapped);
+
+        // We got the properties of the physical base table but we need to 
create the view index table using logical name
+        byte[] viewPhysicalTableName = 
MetaDataUtil.getNamespaceMappedName(table.getName(), 
isNamespaceMapped).getBytes();
+        ensureViewIndexTableCreated(viewPhysicalTableName, physicalTableName, 
tableProps, families, splits, timestamp, isNamespaceMapped);
     }
 
     @Override
@@ -2985,7 +2990,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             tableAndIndexDescriptorMappings.put(origIndexDescriptor, 
newIndexDescriptor);
         }
         // Also keep properties for the physical view index table in sync
-        String viewIndexName = 
MetaDataUtil.getViewIndexPhysicalName(table.getPhysicalName().getString());
+        String viewIndexName = 
MetaDataUtil.getViewIndexPhysicalName(table.getName(), 
table.isNamespaceMapped());
         if (!Strings.isNullOrEmpty(viewIndexName)) {
             try {
                 HTableDescriptor origViewIndexTableDescriptor = 
this.getTableDescriptor(Bytes.toBytes(viewIndexName));
@@ -3814,6 +3819,12 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 }
             }
         }
+        if (currentServerSideTableTimeStamp < 
MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0) {
+            metaConnection = addColumnsIfNotExists(metaConnection,
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0,
+                    PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME
+                            + " " + PVarchar.INSTANCE.getSqlTypeName());
+        }
         return metaConnection;
     }
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 8d970ac..cdc86bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -103,6 +103,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_HWM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID;
@@ -360,6 +361,7 @@ public interface QueryConstants {
             COLUMN_QUALIFIER_COUNTER + " INTEGER, " +
             USE_STATS_FOR_PARALLELIZATION + " BOOLEAN, " +
             TRANSACTION_PROVIDER + " TINYINT, " +
+            PHYSICAL_TABLE_NAME + " VARCHAR," +
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + 
TENANT_ID + ","
             + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + 
COLUMN_FAMILY + "))\n" +
             HConstants.VERSIONS + "=%s,\n" +
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 3815e81..ff86969 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -157,6 +157,11 @@ public class DelegateTable implements PTable {
     }
 
     @Override
+    public PName getBaseTableLogicalName() {
+        return delegate.getBaseTableLogicalName();
+    }
+
+    @Override
     public List<PName> getPhysicalNames() {
         return delegate.getPhysicalNames();
     }
@@ -167,6 +172,11 @@ public class DelegateTable implements PTable {
     }
 
     @Override
+    public PName getPhysicalName(boolean returnColValueFromSyscat) {
+        return delegate.getPhysicalName(returnColValueFromSyscat);
+    }
+
+    @Override
     public boolean isImmutableRows() {
         return delegate.isImmutableRows();
     }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 448a8fc..9388fc7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -71,6 +71,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARENT_TENANT_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
@@ -326,9 +327,10 @@ public class MetaDataClient {
                     VIEW_INDEX_ID_DATA_TYPE +"," +
                     PHOENIX_TTL +"," +
                     PHOENIX_TTL_HWM + "," +
-                    CHANGE_DETECTION_ENABLED +
+                    CHANGE_DETECTION_ENABLED + "," +
+                    PHYSICAL_TABLE_NAME +
                     ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, " +
-                "?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+                "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
     private static final String CREATE_SCHEMA = "UPSERT INTO " + 
SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE
             + "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)";
@@ -896,6 +898,9 @@ public class MetaDataClient {
             MetaDataMutationResult parentResult = 
updateCache(connection.getTenantId(), parentSchemaName, parentTableName,
                     false, resolvedTimestamp);
             PTable parentTable = parentResult.getTable();
+            if (LOGGER.isTraceEnabled()) {
+                LOGGER.trace("addColumnsAndIndexesFromAncestors parent logical 
name " + table.getBaseTableLogicalName().getString() + " parent name " + 
table.getParentName().getString() + " tableName=" + table.getName());
+            }
             if (parentResult.getMutationCode() == MutationCode.TABLE_NOT_FOUND 
|| parentTable == null) {
                 // this mean the parent table was dropped and the child views 
have not yet been
                 // dropped by the TaskRegionObserver
@@ -1744,7 +1749,8 @@ public class MetaDataClient {
                 statement.getProps().put("", new 
Pair<String,Object>(DEFAULT_COLUMN_FAMILY_NAME,dataTable.getDefaultFamilyName().getString()));
             }
             PrimaryKeyConstraint pk = FACTORY.primaryKey(null, allPkColumns);
-            tableProps.put(MetaDataUtil.DATA_TABLE_NAME_PROP_NAME, 
dataTable.getName().getString());
+
+            tableProps.put(MetaDataUtil.DATA_TABLE_NAME_PROP_NAME, 
dataTable.getPhysicalName().getString());
             CreateTableStatement tableStatement = 
FACTORY.createTable(indexTableName, statement.getProps(), columnDefs, pk, 
statement.getSplitNodes(), PTableType.INDEX, statement.ifNotExists(), null, 
null, statement.getBindCount(), null);
             table = createTableInternal(tableStatement, splits, dataTable, 
null, null, getViewIndexDataType() ,null, null, allocateIndexId, 
statement.getIndexType(), asyncCreatedDate, tableProps, commonFamilyProps);
         }
@@ -2136,7 +2142,11 @@ public class MetaDataClient {
                     } else {
                         defaultFamilyName = parent.getDefaultFamilyName() == 
null ? QueryConstants.DEFAULT_COLUMN_FAMILY : 
parent.getDefaultFamilyName().getString();
                         // Set physical name of view index table
-                        physicalNames = 
Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(physicalName.getBytes())));
+                        // Parent is a view and this is an index so we need to 
get _IDX_+logical name of base table.
+                        // parent.getPhysicalName is Schema.Physical of base 
and we can't use it since the _IDX_ table is logical name of the base.
+                        // parent.getName is the view name. 
parent.getBaseTableLogicalName is the logical name of the base table
+                        PName parentName = parent.getBaseTableLogicalName();
+                        physicalNames = 
Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(parentName,
 isNamespaceMapped)));
                     }
                 }
 
@@ -2165,7 +2175,7 @@ public class MetaDataClient {
                 linkStatement.setLong(6, parent.getSequenceNumber());
                 linkStatement.setString(7, 
PTableType.INDEX.getSerializedValue());
                 linkStatement.execute();
-                
+
                 // Add row linking index table to parent table for indexes on 
views
                 if (parent.getType() == PTableType.VIEW) {
                        linkStatement = 
connection.prepareStatement(CREATE_VIEW_INDEX_PARENT_LINK);
@@ -2247,6 +2257,7 @@ public class MetaDataClient {
                 updateCacheFrequency = updateCacheFrequencyProp;
             }
 
+            String physicalTableName = (String) 
TableProperty.PHYSICAL_TABLE_NAME.getValue(tableProps);
             String autoPartitionSeq = (String) 
TableProperty.AUTO_PARTITION_SEQ.getValue(tableProps);
             Long guidePostsWidth = (Long) 
TableProperty.GUIDE_POSTS_WIDTH.getValue(tableProps);
 
@@ -2486,9 +2497,11 @@ public class MetaDataClient {
                         linkStatement.setString(4, physicalName.getString());
                         linkStatement.setByte(5, 
LinkType.PHYSICAL_TABLE.getSerializedValue());
                         if (tableType == PTableType.VIEW) {
-                            PTable physicalTable = connection.getTable(new 
PTableKey(null, physicalName.getString()
+                            PTable logicalTable = connection.getTable(new 
PTableKey(null, physicalName.getString()
                                     
.replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR)));
-                            linkStatement.setLong(6, 
physicalTable.getSequenceNumber());
+                            // Set link to logical name
+                            linkStatement.setString(4, 
SchemaUtil.getTableName(logicalTable.getSchemaName().getString(),logicalTable.getTableName().getString()));
+                            linkStatement.setLong(6, 
logicalTable.getSequenceNumber());
                             linkStatement.setString(7, null);
                         } else {
                             linkStatement.setLong(6, 
parent.getSequenceNumber());
@@ -3036,6 +3049,12 @@ public class MetaDataClient {
                 tableUpsert.setBoolean(32, isChangeDetectionEnabledProp);
             }
 
+            if (physicalTableName == null){
+                tableUpsert.setNull(33, Types.VARCHAR);
+            } else {
+                tableUpsert.setString(33, physicalTableName);
+            }
+
             tableUpsert.execute();
 
             if (asyncCreatedDate != null) {
@@ -3071,6 +3090,10 @@ public class MetaDataClient {
             }
 
                        // Modularized this code for unit testing
+            PName parentName = physicalNames !=null && physicalNames.size() > 
0 ? physicalNames.get(0) : null;
+            if (LOGGER.isTraceEnabled()) {
+                LOGGER.trace("createTable tableName=" + tableName + " parent=" 
+ (parent == null ? "" : parent.getTableName() + "-" + 
parent.getPhysicalName()) + " parent physical=" + parentName + "-" + 
(physicalNames.size() > 0 ? physicalNames.get(0) : "null") + " viewType " + 
viewType + allocateIndexId);
+            }
             MetaDataMutationResult result = 
connection.getQueryServices().createTable(tableMetaData
                 ,viewType == ViewType.MAPPED || allocateIndexId ? 
physicalNames.get(0).getBytes()
                 : null, tableType, tableProps, familyPropList, splits, 
isNamespaceMapped,
@@ -3616,12 +3639,13 @@ public class MetaDataClient {
                 metaPropertiesEvaluated.getImmutableStorageScheme(),
                 metaPropertiesEvaluated.getUseStatsForParallelization(),
                 metaPropertiesEvaluated.getPhoenixTTL(),
-                metaPropertiesEvaluated.isChangeDetectionEnabled());
+                metaPropertiesEvaluated.isChangeDetectionEnabled(),
+                metaPropertiesEvaluated.getPhysicalTableName());
     }
 
-    private  long incrementTableSeqNum(PTable table, PTableType expectedType, 
int columnCountDelta, Boolean isTransactional, Long updateCacheFrequency, Long 
phoenixTTL) throws SQLException {
+    private  long incrementTableSeqNum(PTable table, PTableType expectedType, 
int columnCountDelta, Boolean isTransactional, Long updateCacheFrequency, Long 
phoenixTTL, String physicalTableName) throws SQLException {
         return incrementTableSeqNum(table, expectedType, columnCountDelta, 
isTransactional, null,
-            updateCacheFrequency, null, null, null, null, -1L, null, null, 
null,phoenixTTL, false);
+            updateCacheFrequency, null, null, null, null, -1L, null, null, 
null,phoenixTTL, false, physicalTableName);
     }
 
     private long incrementTableSeqNum(PTable table, PTableType expectedType, 
int columnCountDelta,
@@ -3629,7 +3653,7 @@ public class MetaDataClient {
             Long updateCacheFrequency, Boolean isImmutableRows, Boolean 
disableWAL,
             Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, 
Boolean appendOnlySchema,
             ImmutableStorageScheme immutableStorageScheme, Boolean 
useStatsForParallelization,
-            Long phoenixTTL, Boolean isChangeDetectionEnabled)
+            Long phoenixTTL, Boolean isChangeDetectionEnabled, String 
physicalTableName)
             throws SQLException {
         String schemaName = table.getSchemaName().getString();
         String tableName = table.getTableName().getString();
@@ -3688,6 +3712,9 @@ public class MetaDataClient {
         if (isChangeDetectionEnabled != null) {
             mutateBooleanProperty(tenantId, schemaName, tableName, 
CHANGE_DETECTION_ENABLED, isChangeDetectionEnabled);
         }
+        if (!Strings.isNullOrEmpty(physicalTableName)) {
+            mutateStringProperty(tenantId, schemaName, tableName, 
PHYSICAL_TABLE_NAME, physicalTableName);
+        }
         return seqNum;
     }
 
@@ -4043,7 +4070,8 @@ public class MetaDataClient {
                         incrementTableSeqNum(index, index.getType(), 
numPkColumnsAdded,
                                 metaProperties.getNonTxToTx() ? Boolean.TRUE : 
null,
                                 
metaPropertiesEvaluated.getUpdateCacheFrequency(),
-                                metaPropertiesEvaluated.getPhoenixTTL());
+                                metaPropertiesEvaluated.getPhoenixTTL(),
+                                metaProperties.getPhysicalTableName());
                     }
                     
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
@@ -4054,7 +4082,8 @@ public class MetaDataClient {
                         incrementTableSeqNum(index, index.getType(), 
columnDefs.size(),
                                 Boolean.FALSE,
                                 
metaPropertiesEvaluated.getUpdateCacheFrequency(),
-                                metaPropertiesEvaluated.getPhoenixTTL());
+                                metaPropertiesEvaluated.getPhoenixTTL(),
+                                
metaPropertiesEvaluated.getPhysicalTableName());
                     }
                     
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
@@ -4513,7 +4542,7 @@ public class MetaDataClient {
                         }
                     }
                     if (!indexColumnsToDrop.isEmpty()) {
-                        long indexTableSeqNum = incrementTableSeqNum(index, 
index.getType(), -indexColumnsToDrop.size(), null, null, null);
+                        long indexTableSeqNum = incrementTableSeqNum(index, 
index.getType(), -indexColumnsToDrop.size(), null, null, null, null);
                         dropColumnMutations(index, indexColumnsToDrop);
                         long clientTimestamp = 
MutationState.getTableTimestamp(timeStamp, connection.getSCN());
                         connection.removeColumn(tenantId, 
index.getName().getString(),
@@ -4524,7 +4553,7 @@ public class MetaDataClient {
                 
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                 connection.rollback();
 
-                long seqNum = incrementTableSeqNum(table, 
statement.getTableType(), -tableColumnsToDrop.size(), null, null, null);
+                long seqNum = incrementTableSeqNum(table, 
statement.getTableType(), -tableColumnsToDrop.size(), null, null, null, null);
                 
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                 connection.rollback();
                 // Force table header to be first in list
@@ -5206,6 +5235,8 @@ public class MetaDataClient {
                         
metaProperties.setTransactionProviderProp((TransactionFactory.Provider) value);
                     } else if (propName.equals(UPDATE_CACHE_FREQUENCY)) {
                         
metaProperties.setUpdateCacheFrequencyProp((Long)value);
+                    } else if (propName.equals(PHYSICAL_TABLE_NAME)) {
+                        metaProperties.setPhysicalTableNameProp((String) 
value);
                     } else if (propName.equals(GUIDE_POSTS_WIDTH)) {
                         metaProperties.setGuidePostWidth((Long)value);
                     } else if (propName.equals(APPEND_ONLY_SCHEMA)) {
@@ -5218,6 +5249,8 @@ public class MetaDataClient {
                         metaProperties.setPhoenixTTL((Long)value);
                     } else if 
(propName.equalsIgnoreCase(CHANGE_DETECTION_ENABLED)) {
                         metaProperties.setChangeDetectionEnabled((Boolean) 
value);
+                    } else if (propName.equalsIgnoreCase(PHYSICAL_TABLE_NAME)) 
{
+                        metaProperties.setPhysicalTableName((String) value);
                     }
                 }
                 // if removeTableProps is true only add the property if it is 
not an HTable or Phoenix Table property
@@ -5380,6 +5413,13 @@ public class MetaDataClient {
             }
         }
 
+        if (!Strings.isNullOrEmpty(metaProperties.getPhysicalTableName())) {
+            if 
(!metaProperties.getPhysicalTableName().equals(table.getPhysicalName(true))) {
+                
metaPropertiesEvaluated.setPhysicalTableName(metaProperties.getPhysicalTableName());
+                changingPhoenixTableProperty = true;
+            }
+        }
+
         return changingPhoenixTableProperty;
     }
 
@@ -5391,6 +5431,7 @@ public class MetaDataClient {
         private TransactionFactory.Provider transactionProviderProp = null;
         private Boolean isTransactionalProp = null;
         private Long updateCacheFrequencyProp = null;
+        private String physicalTableNameProp = null;
         private Boolean appendOnlySchemaProp = null;
         private Long guidePostWidth = -1L;
         private ImmutableStorageScheme immutableStorageSchemeProp = null;
@@ -5398,6 +5439,7 @@ public class MetaDataClient {
         private boolean nonTxToTx = false;
         private Long phoenixTTL = null;
         private Boolean isChangeDetectionEnabled = null;
+        private String physicalTableName = null;
 
         public Boolean getImmutableRowsProp() {
             return isImmutableRowsProp;
@@ -5447,6 +5489,14 @@ public class MetaDataClient {
             this.isTransactionalProp = isTransactionalProp;
         }
 
+        public void setPhysicalTableNameProp(String physicalTableNameProp) {
+            this.physicalTableNameProp = physicalTableNameProp;
+        }
+
+        public String gethysicalTableNameProp() {
+            return this.physicalTableNameProp;
+        }
+
         public Long getUpdateCacheFrequencyProp() {
             return updateCacheFrequencyProp;
         }
@@ -5507,6 +5557,14 @@ public class MetaDataClient {
         public void setChangeDetectionEnabled(Boolean 
isChangeDetectionEnabled) {
             this.isChangeDetectionEnabled = isChangeDetectionEnabled;
         }
+
+        public String getPhysicalTableName() {
+            return physicalTableName;
+        }
+
+        public void setPhysicalTableName(String physicalTableName) {
+            this.physicalTableName = physicalTableName;
+        }
     }
 
     private static class MetaPropertiesEvaluated {
@@ -5523,6 +5581,7 @@ public class MetaDataClient {
         private TransactionFactory.Provider transactionProvider = null;
         private Long phoenixTTL = null;
         private Boolean isChangeDetectionEnabled = null;
+        private String physicalTableName = null;
 
         public Boolean getIsImmutableRows() {
             return isImmutableRows;
@@ -5623,5 +5682,13 @@ public class MetaDataClient {
         public void setChangeDetectionEnabled(Boolean 
isChangeDetectionEnabled) {
             this.isChangeDetectionEnabled = isChangeDetectionEnabled;
         }
+
+        public String getPhysicalTableName() {
+            return physicalTableName;
+        }
+
+        public void setPhysicalTableName(String physicalTableName) {
+            this.physicalTableName = physicalTableName;
+        }
     }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index 3ce70ea..99d874f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -21,15 +21,18 @@ import static 
org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 
 import java.sql.SQLException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TimeKeeper;
 
 import com.google.common.collect.Lists;
@@ -43,6 +46,7 @@ public class PMetaDataImpl implements PMetaData {
     private PMetaDataCache metaData;
     private final TimeKeeper timeKeeper;
     private final PTableRefFactory tableRefFactory;
+    private HashMap<String, PTableKey> physicalNameToLogicalTableMap = new 
HashMap<>();
     
     public PMetaDataImpl(int initialCapacity, ReadOnlyProps props) {
         this(initialCapacity, TimeKeeper.SYSTEM, props);
@@ -68,6 +72,9 @@ public class PMetaDataImpl implements PMetaData {
     
     @Override
     public PTableRef getTableRef(PTableKey key) throws TableNotFoundException {
+        if (physicalNameToLogicalTableMap.containsKey(key.getName())) {
+            key = physicalNameToLogicalTableMap.get(key.getName());
+        }
         PTableRef ref = metaData.get(key);
         if (ref == null) {
             throw new TableNotFoundException(key.getName());
@@ -147,6 +154,12 @@ public class PMetaDataImpl implements PMetaData {
         for (PTable index : table.getIndexes()) {
             metaData.put(index.getKey(), tableRefFactory.makePTableRef(index, 
this.timeKeeper.getCurrentTime(), resolvedTime));
         }
+        if (table.getPhysicalName(true) != null && 
!table.getPhysicalName(true).getString().equals(table.getTableName().getString()))
 {
+            String physicalTableName =  
table.getPhysicalName(true).getString().replace(
+                    QueryConstants.NAMESPACE_SEPARATOR, 
QueryConstants.NAME_SEPARATOR);
+            String physicalTableFullName = 
SchemaUtil.getTableName(table.getSchemaName() != null ? 
table.getSchemaName().getString() : null, physicalTableName);
+            this.physicalNameToLogicalTableMap.put(physicalTableFullName, key);
+        }
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 103c65f..70b1939 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -728,6 +728,14 @@ public interface PTable extends PMetaDataEntity {
      * (use @getPhysicalTableName for this case) 
      */
     PName getParentTableName();
+
+    /**
+     * @return the logical full name of the base table. In case of the view 
index, it is the _IDX_+logical name of base table
+     * Ex: For hierarchical views like tableLogicalName --> view1 --> view2, 
for view2, returns sc.tableLogicalName
+     * For view2, getParentTableName returns view1 and getBaseTableLogicalName 
returns sc.tableLogicalName
+     */
+    PName getBaseTableLogicalName();
+
     /**
      * @return the schema name of the parent view for a view or data table for 
an index table 
      * or null if this is not a view or index table. Also returns null for 
view of a data table 
@@ -747,6 +755,13 @@ public interface PTable extends PMetaDataEntity {
      * @return the name of the physical HBase table storing the data.
      */
     PName getPhysicalName();
+    /**
+     * If returnColValueFromSyscat is true, returns the column value set in 
the syscat.
+     * Otherwise, behaves like getPhysicalName()
+     * @return the name of the physical HBase table storing the data.
+     */
+    PName getPhysicalName(boolean returnColValueFromSyscat);
+
     boolean isImmutableRows();
 
     boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection 
connection);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 799d28a..7adcc12 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -29,6 +29,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER;
@@ -103,6 +104,7 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -122,7 +124,6 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-
 /**
  *
  * Base class for PTable implementors.  Provides abstraction for
@@ -144,6 +145,7 @@ public class PTableImpl implements PTable {
     private final PName name;
     private final PName schemaName;
     private final PName tableName;
+    private final PName physicalTableNameColumnInSyscat;
     private final PName tenantId;
     private final PTableType type;
     private final PIndexState state;
@@ -169,6 +171,7 @@ public class PTableImpl implements PTable {
     private final PName parentName;
     private final PName parentSchemaName;
     private final PName parentTableName;
+    private final PName baseTableLogicalName;
     private final List<PName> physicalNames;
     private final boolean isImmutableRows;
     private final PName defaultFamilyName;
@@ -206,6 +209,7 @@ public class PTableImpl implements PTable {
         private PName name;
         private PName schemaName = PName.EMPTY_NAME;
         private PName tableName = PName.EMPTY_NAME;
+        private PName physicalTableName = PName.EMPTY_NAME;
         private PName tenantId;
         private PTableType type;
         private PIndexState state;
@@ -227,6 +231,7 @@ public class PTableImpl implements PTable {
         private PName parentName;
         private PName parentSchemaName;
         private PName parentTableName;
+        private PName baseTableLogicalName;
         private List<PName> physicalNames;
         private boolean isImmutableRows;
         private IndexMaintainer indexMaintainer;
@@ -404,6 +409,11 @@ public class PTableImpl implements PTable {
             return this;
         }
 
+        public Builder setBaseTableLogicalName(PName baseTableLogicalName) {
+            this.baseTableLogicalName = baseTableLogicalName;
+            return this;
+        }
+
         public Builder setPhysicalNames(List<PName> physicalNames) {
             this.physicalNames = physicalNames;
             return this;
@@ -508,6 +518,14 @@ public class PTableImpl implements PTable {
             return this;
         }
 
+        public Builder setPhysicalTableName(PName physicalTableName) {
+            if (physicalTableName != null) {
+                propertyValues.put(PHYSICAL_TABLE_NAME, 
String.valueOf(physicalTableName));
+            }
+            this.physicalTableName = physicalTableName;
+            return this;
+        }
+
         public Builder setUpdateCacheFrequency(long updateCacheFrequency) {
             propertyValues.put(UPDATE_CACHE_FREQUENCY, 
String.valueOf(updateCacheFrequency));
             this.updateCacheFrequency = updateCacheFrequency;
@@ -818,6 +836,7 @@ public class PTableImpl implements PTable {
         this.name = builder.name;
         this.schemaName = builder.schemaName;
         this.tableName = builder.tableName;
+        this.physicalTableNameColumnInSyscat = builder.physicalTableName;
         this.tenantId = builder.tenantId;
         this.type = builder.type;
         this.state = builder.state;
@@ -839,6 +858,7 @@ public class PTableImpl implements PTable {
         this.parentName = builder.parentName;
         this.parentSchemaName = builder.parentSchemaName;
         this.parentTableName = builder.parentTableName;
+        this.baseTableLogicalName = builder.baseTableLogicalName;
         this.physicalNames = builder.physicalNames;
         this.isImmutableRows = builder.isImmutableRows;
         this.indexMaintainer = builder.indexMaintainer;
@@ -928,6 +948,7 @@ public class PTableImpl implements PTable {
                 .setTenantId(table.getTenantId())
                 .setSchemaName(table.getSchemaName())
                 .setTableName(table.getTableName())
+                .setPhysicalTableName(table.getPhysicalName(true))
                 .setPkName(table.getPKName())
                 .setDefaultFamilyName(table.getDefaultFamilyName())
                 .setRowKeyOrderOptimizable(table.rowKeyOrderOptimizable())
@@ -936,6 +957,7 @@ public class PTableImpl implements PTable {
                         Collections.<PTable>emptyList() : table.getIndexes())
                 .setParentSchemaName(table.getParentSchemaName())
                 .setParentTableName(table.getParentTableName())
+                .setBaseTableLogicalName(table.getBaseTableLogicalName())
                 .setPhysicalNames(table.getPhysicalNames() == null ?
                         ImmutableList.<PName>of() : 
ImmutableList.copyOf(table.getPhysicalNames()))
                 .setViewModifiedUseStatsForParallelization(table
@@ -1559,7 +1581,28 @@ public class PTableImpl implements PTable {
     @Override
     public PName getParentName() {
         // a view on a table will not have a parent name but will have a 
physical table name (which is the parent)
-        return (type!=PTableType.VIEW || parentName!=null) ? parentName : 
getPhysicalName();
+        return (type!=PTableType.VIEW || parentName!=null) ? parentName :
+                ((baseTableLogicalName != null && 
!Strings.isNullOrEmpty(baseTableLogicalName.getString()))? baseTableLogicalName
+                        : getPhysicalName());
+    }
+
+    @Override
+    public PName getBaseTableLogicalName() {
+        PName result = null;
+        if (baseTableLogicalName != null && 
!Strings.isNullOrEmpty(baseTableLogicalName.getString())) {
+            result = baseTableLogicalName;
+        } else {
+            if (parentName != null) {
+                result = parentName;
+            } else {
+                if (type == PTableType.VIEW) {
+                    result = getPhysicalName();
+                } else if (type == PTableType.INDEX) {
+                    result = SchemaUtil.getTableName(parentSchemaName, 
parentTableName);
+                }
+            }
+        }
+        return result;
     }
 
     @Override
@@ -1586,7 +1629,14 @@ public class PTableImpl implements PTable {
 
     @Override
     public PName getPhysicalName() {
+        // For views, physicalName is base table physical name. There might be 
a case where the Phoenix table is pointing to another physical table.
+        // In that case, physicalTableName is not null
         if (physicalNames.isEmpty()) {
+            if (physicalTableNameColumnInSyscat != null && 
!Strings.isNullOrEmpty(
+                    physicalTableNameColumnInSyscat.getString())) {
+                return SchemaUtil.getPhysicalHBaseTableName(schemaName,
+                        physicalTableNameColumnInSyscat, isNamespaceMapped);
+            }
             return SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, 
isNamespaceMapped);
         } else {
             return PNameFactory.newName(physicalNames.get(0).getBytes());
@@ -1594,6 +1644,15 @@ public class PTableImpl implements PTable {
     }
 
     @Override
+    public PName getPhysicalName(boolean returnColValueFromSyscat) {
+        if (returnColValueFromSyscat) {
+            return physicalTableNameColumnInSyscat;
+        } else {
+            return getPhysicalName();
+        }
+    }
+
+    @Override
     public List<PName> getPhysicalNames() {
         return !physicalNames.isEmpty() ? physicalNames : 
Lists.newArrayList(getPhysicalName());
     }
@@ -1646,6 +1705,10 @@ public class PTableImpl implements PTable {
         }
         PName schemaName = 
PNameFactory.newName(table.getSchemaNameBytes().toByteArray());
         PName tableName = 
PNameFactory.newName(table.getTableNameBytes().toByteArray());
+        PName physicalTableName = null;
+        if (table.getPhysicalTableNameBytes() != null) {
+            physicalTableName = 
PNameFactory.newName(table.getPhysicalTableNameBytes().toByteArray());
+        }
         PTableType tableType = 
PTableType.values()[table.getTableType().ordinal()];
         PIndexState indexState = null;
         if (table.hasIndexState()) {
@@ -1682,10 +1745,14 @@ public class PTableImpl implements PTable {
         boolean isImmutableRows = table.getIsImmutableRows();
         PName parentSchemaName = null;
         PName parentTableName = null;
+        PName parentLogicalName = null;
         if (table.hasParentNameBytes()) {
             parentSchemaName = 
PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName((table.getParentNameBytes().toByteArray())));
             parentTableName = 
PNameFactory.newName(SchemaUtil.getTableNameFromFullName(table.getParentNameBytes().toByteArray()));
         }
+        if (table.getBaseTableLogicalNameBytes() != null) {
+            parentLogicalName = 
PNameFactory.newName(table.getBaseTableLogicalNameBytes().toByteArray());
+        }
         PName defaultFamilyName = null;
         if (table.hasDefaultFamilyName()) {
             defaultFamilyName = 
PNameFactory.newName(table.getDefaultFamilyName().toByteArray());
@@ -1829,6 +1896,7 @@ public class PTableImpl implements PTable {
                     .setTenantId(tenantId)
                     .setSchemaName(schemaName)
                     .setTableName(tableName)
+                    .setPhysicalTableName(physicalTableName)
                     .setPkName(pkName)
                     .setDefaultFamilyName(defaultFamilyName)
                     .setRowKeyOrderOptimizable(rowKeyOrderOptimizable)
@@ -1836,6 +1904,7 @@ public class PTableImpl implements PTable {
                     .setIndexes(indexes == null ? 
Collections.<PTable>emptyList() : indexes)
                     .setParentSchemaName(parentSchemaName)
                     .setParentTableName(parentTableName)
+                    .setBaseTableLogicalName(parentLogicalName)
                     .setPhysicalNames(physicalNames == null ?
                             ImmutableList.<PName>of() : 
ImmutableList.copyOf(physicalNames))
                     .setColumns(columns)
@@ -1851,112 +1920,126 @@ public class PTableImpl implements PTable {
     }
 
     public static PTableProtos.PTable toProto(PTable table) {
-      PTableProtos.PTable.Builder builder = PTableProtos.PTable.newBuilder();
-      if(table.getTenantId() != null){
-        builder.setTenantId(ByteStringer.wrap(table.getTenantId().getBytes()));
-      }
-      
builder.setSchemaNameBytes(ByteStringer.wrap(table.getSchemaName().getBytes()));
-      
builder.setTableNameBytes(ByteStringer.wrap(table.getTableName().getBytes()));
-      builder.setTableType(ProtobufUtil.toPTableTypeProto(table.getType()));
-      if (table.getType() == PTableType.INDEX) {
-        if(table.getIndexState() != null) {
-          builder.setIndexState(table.getIndexState().getSerializedValue());
-        }
-        if(table.getViewIndexId() != null) {
-          builder.setViewIndexId(table.getViewIndexId());
-          builder.setViewIndexIdType(table.getviewIndexIdType().getSqlType());
-               }
-        if(table.getIndexType() != null) {
-            builder.setIndexType(ByteStringer.wrap(new 
byte[]{table.getIndexType().getSerializedValue()}));
-        }
-      }
-      builder.setSequenceNumber(table.getSequenceNumber());
-      builder.setTimeStamp(table.getTimeStamp());
-      PName tmp = table.getPKName();
-      if (tmp != null) {
-        builder.setPkNameBytes(ByteStringer.wrap(tmp.getBytes()));
-      }
-      Integer bucketNum = table.getBucketNum();
-      int offset = 0;
-      if(bucketNum == null){
-        builder.setBucketNum(NO_SALTING);
-      } else {
-        offset = 1;
-        builder.setBucketNum(bucketNum);
-      }
-      List<PColumn> columns = table.getColumns();
-      int columnSize = columns.size();
-      for (int i = offset; i < columnSize; i++) {
-          PColumn column = columns.get(i);
-          builder.addColumns(PColumnImpl.toProto(column));
-      }
-      List<PTable> indexes = table.getIndexes();
-      for (PTable curIndex : indexes) {
-        builder.addIndexes(toProto(curIndex));
-      }
-      builder.setIsImmutableRows(table.isImmutableRows());
-      // TODO remove this field in 5.0 release
-      if (table.getParentName() != null) {
-          
builder.setDataTableNameBytes(ByteStringer.wrap(table.getParentTableName().getBytes()));
-      }
-      if (table.getParentName() !=null) {
-          
builder.setParentNameBytes(ByteStringer.wrap(table.getParentName().getBytes()));
-      }
-      if (table.getDefaultFamilyName()!= null) {
-        
builder.setDefaultFamilyName(ByteStringer.wrap(table.getDefaultFamilyName().getBytes()));
-      }
-      builder.setDisableWAL(table.isWALDisabled());
-      builder.setMultiTenant(table.isMultiTenant());
-      builder.setStoreNulls(table.getStoreNulls());
-      if (table.getTransactionProvider() != null) {
-          
builder.setTransactionProvider(table.getTransactionProvider().getCode());
-      }
-      if(table.getType() == PTableType.VIEW){
-        builder.setViewType(ByteStringer.wrap(new 
byte[]{table.getViewType().getSerializedValue()}));
-      }
-      if(table.getViewStatement()!=null){
-        
builder.setViewStatement(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getViewStatement())));
-      }
-      for (int i = 0; i < table.getPhysicalNames().size(); i++) {
-        
builder.addPhysicalNames(ByteStringer.wrap(table.getPhysicalNames().get(i).getBytes()));
-      }
-      builder.setBaseColumnCount(table.getBaseColumnCount());
-      builder.setRowKeyOrderOptimizable(table.rowKeyOrderOptimizable());
-      builder.setUpdateCacheFrequency(table.getUpdateCacheFrequency());
-      builder.setIndexDisableTimestamp(table.getIndexDisableTimestamp());
-      builder.setIsNamespaceMapped(table.isNamespaceMapped());
-      if (table.getAutoPartitionSeqName() != null) {
-          builder.setAutoParititonSeqName(table.getAutoPartitionSeqName());
-      }
-      builder.setIsAppendOnlySchema(table.isAppendOnlySchema());
-      if (table.getImmutableStorageScheme() != null) {
-          builder.setStorageScheme(ByteStringer.wrap(new 
byte[]{table.getImmutableStorageScheme().getSerializedMetadataValue()}));
-      }
-      if (table.getEncodedCQCounter() != null) {
-          Map<String, Integer> values = table.getEncodedCQCounter().values();
-          for (Entry<String, Integer> cqCounter : values.entrySet()) {
-              
org.apache.phoenix.coprocessor.generated.PTableProtos.EncodedCQCounter.Builder 
cqBuilder = 
org.apache.phoenix.coprocessor.generated.PTableProtos.EncodedCQCounter.newBuilder();
-              cqBuilder.setColFamily(cqCounter.getKey());
-              cqBuilder.setCounter(cqCounter.getValue());
-              builder.addEncodedCQCounters(cqBuilder.build());
-          }
-      }
-      if (table.getEncodingScheme() != null) {
-          builder.setEncodingScheme(ByteStringer.wrap(new 
byte[]{table.getEncodingScheme().getSerializedMetadataValue()}));
-      }
-      if (table.useStatsForParallelization() != null) {
-          
builder.setUseStatsForParallelization(table.useStatsForParallelization());
-      }
-      builder.setPhoenixTTL(table.getPhoenixTTL());
-      builder.setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark());
-      
builder.setViewModifiedUpdateCacheFrequency(table.hasViewModifiedUpdateCacheFrequency());
-      
builder.setViewModifiedUseStatsForParallelization(table.hasViewModifiedUseStatsForParallelization());
-      builder.setViewModifiedPhoenixTTL(table.hasViewModifiedPhoenixTTL());
-      if (table.getLastDDLTimestamp() != null) {
-          builder.setLastDDLTimestamp(table.getLastDDLTimestamp());
-      }
-      builder.setChangeDetectionEnabled(table.isChangeDetectionEnabled());
-      return builder.build();
+        PTableProtos.PTable.Builder builder = PTableProtos.PTable.newBuilder();
+        if (table.getTenantId() != null) {
+            
builder.setTenantId(ByteStringer.wrap(table.getTenantId().getBytes()));
+        }
+        
builder.setSchemaNameBytes(ByteStringer.wrap(table.getSchemaName().getBytes()));
+        
builder.setTableNameBytes(ByteStringer.wrap(table.getTableName().getBytes()));
+        if (table.getPhysicalName(true) == null) {
+            
builder.setPhysicalTableNameBytes(ByteStringer.wrap(table.getTableName().getBytes()));
+        } else {
+            
builder.setPhysicalTableNameBytes(ByteStringer.wrap(table.getPhysicalName(true).getBytes()));
+        }
+        builder.setTableType(ProtobufUtil.toPTableTypeProto(table.getType()));
+        if (table.getType() == PTableType.INDEX) {
+            if (table.getIndexState() != null) {
+                
builder.setIndexState(table.getIndexState().getSerializedValue());
+            }
+            if (table.getViewIndexId() != null) {
+                builder.setViewIndexId(table.getViewIndexId());
+                
builder.setViewIndexIdType(table.getviewIndexIdType().getSqlType());
+            }
+            if (table.getIndexType() != null) {
+                builder.setIndexType(ByteStringer
+                        .wrap(new byte[] { 
table.getIndexType().getSerializedValue() }));
+            }
+        }
+        builder.setSequenceNumber(table.getSequenceNumber());
+        builder.setTimeStamp(table.getTimeStamp());
+        PName tmp = table.getPKName();
+        if (tmp != null) {
+            builder.setPkNameBytes(ByteStringer.wrap(tmp.getBytes()));
+        }
+        Integer bucketNum = table.getBucketNum();
+        int offset = 0;
+        if (bucketNum == null) {
+            builder.setBucketNum(NO_SALTING);
+        } else {
+            offset = 1;
+            builder.setBucketNum(bucketNum);
+        }
+        List<PColumn> columns = table.getColumns();
+        int columnSize = columns.size();
+        for (int i = offset; i < columnSize; i++) {
+            PColumn column = columns.get(i);
+            builder.addColumns(PColumnImpl.toProto(column));
+        }
+        List<PTable> indexes = table.getIndexes();
+        for (PTable curIndex : indexes) {
+            builder.addIndexes(toProto(curIndex));
+        }
+        builder.setIsImmutableRows(table.isImmutableRows());
+        // TODO remove this field in 5.0 release
+        if (table.getParentName() != null) {
+            
builder.setDataTableNameBytes(ByteStringer.wrap(table.getParentTableName().getBytes()));
+        }
+        if (table.getParentName() != null) {
+            
builder.setParentNameBytes(ByteStringer.wrap(table.getParentName().getBytes()));
+        }
+        if (table.getBaseTableLogicalName() != null) {
+            
builder.setBaseTableLogicalNameBytes(ByteStringer.wrap(table.getBaseTableLogicalName().getBytes()));
+        }
+        if (table.getDefaultFamilyName() != null) {
+            
builder.setDefaultFamilyName(ByteStringer.wrap(table.getDefaultFamilyName().getBytes()));
+        }
+        builder.setDisableWAL(table.isWALDisabled());
+        builder.setMultiTenant(table.isMultiTenant());
+        builder.setStoreNulls(table.getStoreNulls());
+        if (table.getTransactionProvider() != null) {
+            
builder.setTransactionProvider(table.getTransactionProvider().getCode());
+        }
+        if (table.getType() == PTableType.VIEW) {
+            builder.setViewType(
+                    ByteStringer.wrap(new byte[] { 
table.getViewType().getSerializedValue() }));
+        }
+        if (table.getViewStatement() != null) {
+            
builder.setViewStatement(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getViewStatement())));
+        }
+        for (int i = 0; i < table.getPhysicalNames().size(); i++) {
+            
builder.addPhysicalNames(ByteStringer.wrap(table.getPhysicalNames().get(i).getBytes()));
+        }
+        builder.setBaseColumnCount(table.getBaseColumnCount());
+        builder.setRowKeyOrderOptimizable(table.rowKeyOrderOptimizable());
+        builder.setUpdateCacheFrequency(table.getUpdateCacheFrequency());
+        builder.setIndexDisableTimestamp(table.getIndexDisableTimestamp());
+        builder.setIsNamespaceMapped(table.isNamespaceMapped());
+        if (table.getAutoPartitionSeqName() != null) {
+            builder.setAutoParititonSeqName(table.getAutoPartitionSeqName());
+        }
+        builder.setIsAppendOnlySchema(table.isAppendOnlySchema());
+        if (table.getImmutableStorageScheme() != null) {
+            builder.setStorageScheme(ByteStringer.wrap(new byte[] {
+                    
table.getImmutableStorageScheme().getSerializedMetadataValue() }));
+        }
+        if (table.getEncodedCQCounter() != null) {
+            Map<String, Integer> values = table.getEncodedCQCounter().values();
+            for (Entry<String, Integer> cqCounter : values.entrySet()) {
+                
org.apache.phoenix.coprocessor.generated.PTableProtos.EncodedCQCounter.Builder
+                        cqBuilder =
+                        
org.apache.phoenix.coprocessor.generated.PTableProtos.EncodedCQCounter.newBuilder();
+                cqBuilder.setColFamily(cqCounter.getKey());
+                cqBuilder.setCounter(cqCounter.getValue());
+                builder.addEncodedCQCounters(cqBuilder.build());
+            }
+        }
+        if (table.getEncodingScheme() != null) {
+            builder.setEncodingScheme(ByteStringer
+                    .wrap(new byte[] { 
table.getEncodingScheme().getSerializedMetadataValue() }));
+        }
+        if (table.useStatsForParallelization() != null) {
+            
builder.setUseStatsForParallelization(table.useStatsForParallelization());
+        }
+        builder.setPhoenixTTL(table.getPhoenixTTL());
+        builder.setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark());
+        
builder.setViewModifiedUpdateCacheFrequency(table.hasViewModifiedUpdateCacheFrequency());
+        
builder.setViewModifiedUseStatsForParallelization(table.hasViewModifiedUseStatsForParallelization());
+        builder.setViewModifiedPhoenixTTL(table.hasViewModifiedPhoenixTTL());
+        if (table.getLastDDLTimestamp() != null) {
+            builder.setLastDDLTimestamp(table.getLastDDLTimestamp());
+        }
+        builder.setChangeDetectionEnabled(table.isChangeDetectionEnabled());
+        return builder.build();
     }
 
     @Override
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 435911f..4116bdd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -294,6 +294,12 @@ public enum TableProperty {
         public Object getPTableValue(PTable table) {
             return table.isChangeDetectionEnabled();
         }
+    },
+
+    PHYSICAL_TABLE_NAME(PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME, 
COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false, false) {
+        @Override public Object getPTableValue(PTable table) {
+            return table.getPhysicalName(true);
+        }
     }
     ;
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index c8c3f8a..d89114b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -594,6 +594,19 @@ public class MetaDataUtil {
         return getIndexPhysicalName(physicalTableName, 
VIEW_INDEX_TABLE_PREFIX);
     }
 
+    public static String getNamespaceMappedName(PName tableName, boolean 
isNamespaceMapped) {
+        String logicalName = tableName.getString();
+        if (isNamespaceMapped) {
+            logicalName = logicalName.replace(QueryConstants.NAME_SEPARATOR, 
QueryConstants.NAMESPACE_SEPARATOR);
+        }
+        return logicalName;
+    }
+
+    public static String getViewIndexPhysicalName(PName logicalTableName, 
boolean isNamespaceMapped) {
+        String logicalName = getNamespaceMappedName(logicalTableName, 
isNamespaceMapped);
+        return getIndexPhysicalName(logicalName, VIEW_INDEX_TABLE_PREFIX);
+    }
+
     private static byte[] getIndexPhysicalName(byte[] physicalTableName, 
String indexPrefix) {
         return 
Bytes.toBytes(getIndexPhysicalName(Bytes.toString(physicalTableName), 
indexPrefix));
     }
@@ -678,13 +691,13 @@ public class MetaDataUtil {
         return new SequenceKey(isNamespaceMapped ? tenantId : null, 
schemaName, tableName, nSaltBuckets);
     }
 
-    public static String getViewIndexSequenceSchemaName(PName physicalName, 
boolean isNamespaceMapped) {
+    public static String getViewIndexSequenceSchemaName(PName 
logicalBaseTableName, boolean isNamespaceMapped) {
         if (!isNamespaceMapped) {
-            String baseTableName = 
SchemaUtil.getParentTableNameFromIndexTable(physicalName.getString(),
+            String baseTableName = 
SchemaUtil.getParentTableNameFromIndexTable(logicalBaseTableName.getString(),
                 MetaDataUtil.VIEW_INDEX_TABLE_PREFIX);
             return SchemaUtil.getSchemaNameFromFullName(baseTableName);
         } else {
-            return 
SchemaUtil.getSchemaNameFromFullName(physicalName.toString());
+            return 
SchemaUtil.getSchemaNameFromFullName(logicalBaseTableName.toString());
         }
 
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 014ea24..d341420 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -328,6 +328,10 @@ public class SchemaUtil {
                 SEPARATOR_BYTE_ARRAY, Bytes.toBytes(familyName));
     }
 
+    public static PName getTableName(PName schemaName, PName tableName) {
+        return PNameFactory.newName(getName(schemaName==null? null : 
schemaName.getString(), tableName.getString(), false));
+    }
+
     public static String getTableName(String schemaName, String tableName) {
         return getName(schemaName,tableName, false);
     }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 22f058c..6804ae6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -1331,7 +1331,7 @@ public class UpgradeUtil {
      */
     private static void syncViewIndexTable(ConnectionQueryServices cqs, PTable 
baseTable, HColumnDescriptor defaultColFam,
             Map<String, Object> syncedProps, Set<HTableDescriptor> 
tableDescsToSync) throws SQLException {
-        String viewIndexName = 
MetaDataUtil.getViewIndexPhysicalName(baseTable.getPhysicalName().getString());
+        String viewIndexName = 
MetaDataUtil.getViewIndexPhysicalName(baseTable.getName().getString());
         if (!Strings.isNullOrEmpty(viewIndexName)) {
             try {
                 
addTableDescIfPropsChanged(cqs.getTableDescriptor(Bytes.toBytes(viewIndexName)),
@@ -2391,7 +2391,7 @@ public class UpgradeUtil {
     private static void updateIndexesSequenceIfPresent(PhoenixConnection 
connection, PTable dataTable)
             throws SQLException {
         PName tenantId = connection.getTenantId();
-        PName physicalName = dataTable.getPhysicalName();
+        PName physicalName = dataTable.getName();
         PName oldPhysicalName = PNameFactory.newName(
                 
physicalName.toString().replace(QueryConstants.NAMESPACE_SEPARATOR, 
QueryConstants.NAME_SEPARATOR));
         String oldSchemaName = 
MetaDataUtil.getViewIndexSequenceSchemaName(oldPhysicalName, false);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
index c04ddc3..e48e974 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
@@ -756,6 +756,7 @@ public class ViewUtil {
                 .setAutoPartitionSeqName(parentTable.getAutoPartitionSeqName())
                 .setAppendOnlySchema(parentTable.isAppendOnlySchema())
                 .setBaseColumnCount(baseTableColumnCount)
+                .setBaseTableLogicalName(parentTable.getBaseTableLogicalName())
                 .setTimeStamp(maxTableTimestamp)
                 .setExcludedColumns(ImmutableList.copyOf(excludedColumns))
                 .setUpdateCacheFrequency(updateCacheFreq)
diff --git a/phoenix-core/src/main/protobuf/PTable.proto 
b/phoenix-core/src/main/protobuf/PTable.proto
index 0e9fe41..07fe81f 100644
--- a/phoenix-core/src/main/protobuf/PTable.proto
+++ b/phoenix-core/src/main/protobuf/PTable.proto
@@ -111,6 +111,8 @@ message PTable {
   optional bool viewModifiedPhoenixTTL = 44;
   optional int64 lastDDLTimestamp = 45;
   optional bool changeDetectionEnabled = 46;
+  optional bytes physicalTableNameBytes = 47;
+  optional bytes baseTableLogicalNameBytes = 48;
 }
 
 message EncodedCQCounter {

Reply via email to