[ 
https://issues.apache.org/jira/browse/PHOENIX-6247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17316743#comment-17316743
 ] 

ASF GitHub Bot commented on PHOENIX-6247:
-----------------------------------------

gjacoby126 commented on a change in pull request #1170:
URL: https://github.com/apache/phoenix/pull/1170#discussion_r609039671



##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
##########
@@ -896,6 +898,9 @@ private boolean 
addColumnsAndIndexesFromAncestors(MetaDataMutationResult result,
             MetaDataMutationResult parentResult = 
updateCache(connection.getTenantId(), parentSchemaName, parentTableName,
                     false, resolvedTimestamp);
             PTable parentTable = parentResult.getTable();
+            if (LOGGER.isDebugEnabled()) {

Review comment:
       Do we need this logging? Wondering if it should be at TRACE level to 
avoid having a bunch of logs of it, since I think this is a pretty frequently 
used function.

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
##########
@@ -728,6 +728,14 @@ private static int getReservedQualifier(byte[] bytes, int 
offset, int length) {
      * (use @getPhysicalTableName for this case) 
      */
     PName getParentTableName();
+
+    /**
+     * @return the logical full name of the parent. In case of the view index, 
it is the _IDX_+logical name of base table

Review comment:
       Should be "the logical full name of the base table", not the parent 
(which may not be the base table, in the case of a child view)

##########
File path: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
##########
@@ -0,0 +1,819 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.curator.shaded.com.google.common.base.Joiner;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static java.util.Arrays.asList;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
+import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.MAX_ROWS;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(Parameterized.class)
+@Category(NeedsOwnMiniClusterTest.class)
+public class LogicalTableNameIT extends BaseTest {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogicalTableNameIT.class);
+
+    private final boolean createChildAfterTransform;
+    private final boolean immutable;
+    private String dataTableDdl;
+    public static final String NEW_TABLE_PREFIX = "NEW_TBL_";
+    private Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newConcurrentMap();
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString());
+        props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
Integer.toString(60*60*1000)); // An hour
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    public LogicalTableNameIT(boolean createChildAfterTransform, boolean 
immutable)  {
+        this.createChildAfterTransform = createChildAfterTransform;
+        this.immutable = immutable;
+        StringBuilder optionBuilder = new StringBuilder();
+        if (immutable) {
+            optionBuilder.append(" 
,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, IMMUTABLE_ROWS=true");
+        }
+        this.dataTableDdl = optionBuilder.toString();
+    }
+
+    @Parameterized.Parameters(
+            name = "createChildAfterTransform={0}, immutable={1}")
+    public static synchronized Collection<Object[]> data() {
+        List<Object[]> list = Lists.newArrayListWithExpectedSize(2);
+        boolean[] Booleans = new boolean[] { false, true };
+        for (boolean immutable : Booleans) {
+            for (boolean createAfter : Booleans) {
+                list.add(new Object[] { createAfter, immutable });
+            }
+        }
+
+        return list;
+    }
+
+    private Connection getConnection(Properties props) throws Exception {
+        props.setProperty(QueryServices.DROP_METADATA_ATTRIB, 
Boolean.toString(true));
+        // Force real driver to be used as the test one doesn't handle creating
+        // more than one ConnectionQueryService
+        props.setProperty(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
StringUtil.EMPTY_STRING);
+        // Create new ConnectionQueryServices so that we can set 
DROP_METADATA_ATTRIB
+        String url = QueryUtil.getConnectionUrl(props, config, "PRINCIPAL");
+        return DriverManager.getConnection(url, props);
+    }
+
+    private  HashMap<String, ArrayList<String>> 
testBaseTableWithIndex_BaseTableChange(Connection conn, Connection conn2, 
String schemaName, String tableName, String indexName) throws Exception {

Review comment:
       tiny nit: line length

##########
File path: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
##########
@@ -0,0 +1,819 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.curator.shaded.com.google.common.base.Joiner;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static java.util.Arrays.asList;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
+import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.MAX_ROWS;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(Parameterized.class)
+@Category(NeedsOwnMiniClusterTest.class)
+public class LogicalTableNameIT extends BaseTest {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogicalTableNameIT.class);
+
+    private final boolean createChildAfterTransform;
+    private final boolean immutable;
+    private String dataTableDdl;
+    public static final String NEW_TABLE_PREFIX = "NEW_TBL_";
+    private Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newConcurrentMap();
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString());
+        props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
Integer.toString(60*60*1000)); // An hour
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    public LogicalTableNameIT(boolean createChildAfterTransform, boolean 
immutable)  {
+        this.createChildAfterTransform = createChildAfterTransform;
+        this.immutable = immutable;
+        StringBuilder optionBuilder = new StringBuilder();
+        if (immutable) {
+            optionBuilder.append(" 
,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, IMMUTABLE_ROWS=true");
+        }
+        this.dataTableDdl = optionBuilder.toString();
+    }
+
+    @Parameterized.Parameters(
+            name = "createChildAfterTransform={0}, immutable={1}")
+    public static synchronized Collection<Object[]> data() {
+        List<Object[]> list = Lists.newArrayListWithExpectedSize(2);
+        boolean[] Booleans = new boolean[] { false, true };
+        for (boolean immutable : Booleans) {
+            for (boolean createAfter : Booleans) {
+                list.add(new Object[] { createAfter, immutable });
+            }
+        }
+
+        return list;
+    }
+
+    private Connection getConnection(Properties props) throws Exception {
+        props.setProperty(QueryServices.DROP_METADATA_ATTRIB, 
Boolean.toString(true));
+        // Force real driver to be used as the test one doesn't handle creating
+        // more than one ConnectionQueryService
+        props.setProperty(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
StringUtil.EMPTY_STRING);
+        // Create new ConnectionQueryServices so that we can set 
DROP_METADATA_ATTRIB
+        String url = QueryUtil.getConnectionUrl(props, config, "PRINCIPAL");
+        return DriverManager.getConnection(url, props);
+    }
+
+    private  HashMap<String, ArrayList<String>> 
testBaseTableWithIndex_BaseTableChange(Connection conn, Connection conn2, 
String schemaName, String tableName, String indexName) throws Exception {
+        conn.setAutoCommit(true);
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        createTable(conn, fullTableName);
+        if (!createChildAfterTransform) {
+            createIndexOnTable(conn, fullTableName, indexName);
+        }
+        HashMap<String, ArrayList<String>> expected = populateTable(conn, 
fullTableName, 1, 2);
+
+        // Create another hbase table and add 1 more row
+        String newTableName =  NEW_TABLE_PREFIX + tableName;
+        String fullNewTableName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+            String snapshotName = new 
StringBuilder(fullTableName).append("-Snapshot").toString();
+            admin.snapshot(snapshotName, TableName.valueOf(fullTableName));
+            admin.cloneSnapshot(Bytes.toBytes(snapshotName), 
Bytes.toBytes(fullNewTableName));
+
+            try (HTableInterface htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullNewTableName)))
 {
+                Put put = new Put(ByteUtil.concat(Bytes.toBytes("PK3")));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES,
+                        QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V1"), Bytes.toBytes("V13"));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V2"),
+                        PInteger.INSTANCE.toBytes(3));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V3"),
+                        PInteger.INSTANCE.toBytes(4));
+                htable.put(put);
+                expected.put("PK3", Lists.newArrayList("PK3", "V13", "3", 
"4"));
+            }
+        }
+
+        // Query to cache on the second connection
+        String selectTable1 = "SELECT PK1, V1, V2, V3 FROM " + fullTableName + 
" ORDER BY PK1 DESC";
+        ResultSet rs1 = conn2.createStatement().executeQuery(selectTable1);
+        assertTrue(rs1.next());
+
+        // Rename table to point to the new hbase table
+        renameAndDropPhysicalTable(conn, "NULL", schemaName, tableName, 
newTableName);
+
+        if (createChildAfterTransform) {
+            createIndexOnTable(conn, fullTableName, indexName);
+        }
+
+        SingleCellIndexIT.dumpTable(fullNewTableName);
+        return expected;
+    }
+
+    @Test
+    public void testUpdatePhysicalTableNameWithIndex() throws Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                HashMap<String, ArrayList<String>> expected = 
testBaseTableWithIndex_BaseTableChange(conn, conn2, schemaName, tableName, 
indexName);
+
+                // We have to rebuild index for this to work
+                IndexToolIT.runIndexTool(true, false, schemaName, tableName, 
indexName);
+
+                validateTable(conn, fullTableName);
+                validateTable(conn2, fullTableName);
+                validateIndex(conn, fullIndexName, false, expected);
+                validateIndex(conn2, fullIndexName, false, expected);
+
+                // Add row and check
+                populateTable(conn, fullTableName, 10, 1);
+                ResultSet rs = conn2.createStatement().executeQuery("SELECT * 
FROM " + fullIndexName + " WHERE \":PK1\"='PK10'");
+                assertEquals(true, rs.next());
+                rs = conn.createStatement().executeQuery("SELECT * FROM " + 
fullTableName  + " WHERE PK1='PK10'");
+                assertEquals(true, rs.next());
+
+                
SingleCellIndexIT.dumpTable(SchemaUtil.getTableName(schemaName, 
NEW_TABLE_PREFIX+tableName));
+                // Drop row and check
+                conn.createStatement().execute("DELETE from " + fullTableName 
+ " WHERE PK1='PK10'");
+                rs = conn2.createStatement().executeQuery("SELECT * FROM " + 
fullIndexName + " WHERE \":PK1\"='PK10'");
+                assertEquals(false, rs.next());
+                rs = conn.createStatement().executeQuery("SELECT * FROM " + 
fullTableName  + " WHERE PK1='PK10'");
+                assertEquals(false, rs.next());
+
+                conn2.createStatement().execute("DROP TABLE " + fullTableName);
+                // check that the physical data table is dropped
+                Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                assertEquals(false, 
admin.tableExists(TableName.valueOf(SchemaUtil.getTableName(schemaName,NEW_TABLE_PREFIX
 + tableName))));
+
+                // check that index is dropped
+                assertEquals(false, 
admin.tableExists(TableName.valueOf(fullIndexName)));
+
+            }
+        }
+    }
+
+    @Test
+    public void testUpdatePhysicalTableNameWithIndex_runScrutiny() throws 
Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                testBaseTableWithIndex_BaseTableChange(conn, conn2, 
schemaName, tableName, indexName);
+
+                
SingleCellIndexIT.dumpTable(SchemaUtil.getTableName(schemaName, indexName));
+                List<Job>
+                        completedJobs =
+                        IndexScrutinyToolBaseIT.runScrutinyTool(schemaName, 
tableName, indexName, 1L,
+                                
IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
+
+                Job job = completedJobs.get(0);
+                assertTrue(job.isSuccessful());
+
+                Counters counters = job.getCounters();
+                if (createChildAfterTransform) {
+                    assertEquals(3, 
counters.findCounter(VALID_ROW_COUNT).getValue());
+                    assertEquals(0, 
counters.findCounter(INVALID_ROW_COUNT).getValue());
+                } else {
+                    // Since we didn't build the index, we expect 1 missing 
index row
+                    assertEquals(2, 
counters.findCounter(VALID_ROW_COUNT).getValue());
+                    assertEquals(1, 
counters.findCounter(INVALID_ROW_COUNT).getValue());
+                }
+            }
+        }
+    }
+
+    private  HashMap<String, ArrayList<String>> 
test_IndexTableChange(Connection conn, Connection conn2, String schemaName, 
String tableName, String indexName, byte[] verifiedBytes) throws Exception {
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        conn.setAutoCommit(true);
+        createTable(conn, fullTableName);
+        createIndexOnTable(conn, fullTableName, indexName);
+        HashMap<String, ArrayList<String>> expected = populateTable(conn, 
fullTableName, 1, 2);
+
+        // Create another hbase table for index and add 1 more row
+        String newTableName = "NEW_IDXTBL_" + generateUniqueName();
+        String fullNewTableName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .getAdmin()) {
+            String snapshotName = new 
StringBuilder(indexName).append("-Snapshot").toString();
+            admin.snapshot(snapshotName, TableName.valueOf(fullIndexName));
+            admin.cloneSnapshot(Bytes.toBytes(snapshotName), 
Bytes.toBytes(fullNewTableName));
+
+            try (HTableInterface htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullNewTableName)))
 {
+                Put
+                        put =
+                        new Put(ByteUtil.concat(Bytes.toBytes("V13"), 
QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes("PK3")));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES,
+                        verifiedBytes);
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("0:V2"),
+                        PInteger.INSTANCE.toBytes(3));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("0:V3"),
+                        PInteger.INSTANCE.toBytes(4));
+                htable.put(put);
+                expected.put("PK3", Lists.newArrayList("PK3", "V13", "3", 
"4"));
+            }
+        }
+
+        // Query to cache on the second connection
+        String selectTable1 = "SELECT * FROM " + fullIndexName;
+        ResultSet rs1 = conn2.createStatement().executeQuery(selectTable1);
+        assertTrue(rs1.next());
+
+        // Rename table to point to the new hbase table
+        renameAndDropPhysicalTable(conn, "NULL", schemaName, indexName, 
newTableName);
+
+        SingleCellIndexIT.dumpTable(fullNewTableName);
+        return expected;
+    }
+
+    @Test
+    public void testUpdatePhysicalIndexTableName() throws Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                HashMap<String, ArrayList<String>> expected = 
test_IndexTableChange(conn, conn2, schemaName, tableName, indexName, 
IndexRegionObserver.VERIFIED_BYTES);
+
+                validateIndex(conn, fullIndexName, false, expected);
+                validateIndex(conn2, fullIndexName, false, expected);
+
+                // create another index and drop the first index and validate 
the second one
+                String indexName2 = "IDX2_" + generateUniqueName();
+                String fullIndexName2 = SchemaUtil.getTableName(schemaName, 
indexName2);
+                if (createChildAfterTransform) {
+                    createIndexOnTable(conn2, fullTableName, indexName2);
+                }
+                dropIndex(conn2, fullTableName, indexName);
+                if (!createChildAfterTransform) {
+                    createIndexOnTable(conn2, fullTableName, indexName2);
+                }
+                // The new index doesn't have the new row
+                expected.remove("PK3");
+                validateIndex(conn, fullIndexName2, false, expected);
+                validateIndex(conn2, fullIndexName2, false, expected);
+            }
+        }
+    }
+
+    @Test
+    public void testUpdatePhysicalIndexTableName_runScrutiny() throws 
Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                test_IndexTableChange(conn, conn2, schemaName, tableName, 
indexName, IndexRegionObserver.VERIFIED_BYTES);
+                List<Job>
+                        completedJobs =
+                        IndexScrutinyToolBaseIT.runScrutinyTool(schemaName, 
tableName, indexName, 1L,
+                                
IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE);
+
+                Job job = completedJobs.get(0);
+                assertTrue(job.isSuccessful());
+
+                Counters counters = job.getCounters();
+
+                // Since we didn't build the index, we expect 1 missing index 
row
+                assertEquals(2, 
counters.findCounter(VALID_ROW_COUNT).getValue());
+                assertEquals(1, 
counters.findCounter(INVALID_ROW_COUNT).getValue());
+
+                // Try with unverified bytes
+                String tableName2 = "TBL_" + generateUniqueName();
+                String indexName2 = "IDX_" + generateUniqueName();
+                test_IndexTableChange(conn, conn2, schemaName, tableName2, 
indexName2, IndexRegionObserver.UNVERIFIED_BYTES);
+
+                completedJobs =
+                        IndexScrutinyToolBaseIT.runScrutinyTool(schemaName, 
tableName2, indexName2, 1L,
+                                
IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE);
+
+                job = completedJobs.get(0);
+                assertTrue(job.isSuccessful());
+
+                counters = job.getCounters();
+
+                // Since we didn't build the index, we expect 1 missing index 
row
+                assertEquals(2, 
counters.findCounter(VALID_ROW_COUNT).getValue());
+                assertEquals(0, 
counters.findCounter(INVALID_ROW_COUNT).getValue());
+
+            }
+        }
+    }
+
+    private HashMap<String, ArrayList<String>> 
testWithViewsAndIndex_BaseTableChange(Connection conn, Connection conn2, String 
schemaName, String tableName, String viewName1, String v1_indexName1, String 
v1_indexName2, String viewName2, String v2_indexName1) throws Exception {
+        conn.setAutoCommit(true);
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullViewName1 = SchemaUtil.getTableName(schemaName, viewName1);
+        String fullViewName2 = SchemaUtil.getTableName(schemaName, viewName2);
+        createTable(conn, fullTableName);
+        HashMap<String, ArrayList<String>> expected = new HashMap<>();
+        if (!createChildAfterTransform) {
+            createViewAndIndex(conn, schemaName, tableName, viewName1, 
v1_indexName1);
+            createViewAndIndex(conn, schemaName, tableName, viewName1, 
v1_indexName2);
+            createViewAndIndex(conn, schemaName, tableName, viewName2, 
v2_indexName1);
+            expected.putAll(populateView(conn, fullViewName1, 1,2));
+            expected.putAll(populateView(conn, fullViewName2, 10,2));
+        }
+
+        // Create another hbase table and add 1 more row
+        String newTableName = "NEW_TBL_" + generateUniqueName();
+        String fullNewTableName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .getAdmin()) {
+            String snapshotName = new 
StringBuilder(fullTableName).append("-Snapshot").toString();
+            admin.snapshot(snapshotName, TableName.valueOf(fullTableName));
+            admin.cloneSnapshot(Bytes.toBytes(snapshotName), 
Bytes.toBytes(fullNewTableName));
+
+            try (HTableInterface htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullNewTableName)))
 {
+                Put put = new Put(ByteUtil.concat(Bytes.toBytes("PK3")));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES,
+                        QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V1"),
+                        Bytes.toBytes("V13"));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V2"),
+                        PInteger.INSTANCE.toBytes(3));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V3"),
+                        PInteger.INSTANCE.toBytes(4));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("VIEW_COL1"),
+                        Bytes.toBytes("VIEW_COL1_3"));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("VIEW_COL2"),
+                        Bytes.toBytes("VIEW_COL2_3"));
+                htable.put(put);
+                expected.put("PK3", Lists.newArrayList("PK3", "V13", "3", "4", 
"VIEW_COL1_3", "VIEW_COL2_3"));
+            }
+        }
+
+        // Query to cache on the second connection
+        String selectTable1 = "SELECT PK1, V1, V2, V3 FROM " + fullTableName + 
" ORDER BY PK1 DESC";
+        ResultSet rs1 = conn2.createStatement().executeQuery(selectTable1);
+        if (!createChildAfterTransform) {
+            assertTrue(rs1.next());
+        }
+
+        // Rename table to point to hbase table
+        renameAndDropPhysicalTable(conn, "NULL", schemaName, tableName, 
newTableName);
+
+        conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+        if (createChildAfterTransform) {
+            createViewAndIndex(conn, schemaName, tableName, viewName1, 
v1_indexName1);
+            createViewAndIndex(conn, schemaName, tableName, viewName1, 
v1_indexName2);
+            createViewAndIndex(conn, schemaName, tableName, viewName2, 
v2_indexName1);
+            expected.putAll(populateView(conn, fullViewName1, 1,2));
+            expected.putAll(populateView(conn, fullViewName2, 10,2));
+        }
+
+        SingleCellIndexIT.dumpTable(fullNewTableName);
+        return expected;
+    }
+
+
+    private PhoenixTestBuilder.SchemaBuilder createGlobalViewAndTenantView() 
throws Exception {
+        int numOfRows = 5;
+        PhoenixTestBuilder.SchemaBuilder.TableOptions tableOptions = 
PhoenixTestBuilder.SchemaBuilder.TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+        tableOptions.setTableProps(" MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0 
"+this.dataTableDdl);
+
+        PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions globalViewOptions = 
PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions.withDefaults();
+
+        PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions 
globalViewIndexOptions =
+                
PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        PhoenixTestBuilder.SchemaBuilder.TenantViewOptions tenantViewOptions = 
new PhoenixTestBuilder.SchemaBuilder.TenantViewOptions();
+        tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", 
"COL9"));
+        tenantViewOptions.setTenantViewColumnTypes(asList("CHAR(15)", 
"VARCHAR", "VARCHAR", "VARCHAR"));
+
+        PhoenixTestBuilder.SchemaBuilder.OtherOptions 
testCaseWhenAllCFMatchAndAllDefault = new 
PhoenixTestBuilder.SchemaBuilder.OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTenantViewCFs(Lists.newArrayList((String) null, null, 
null, null));
+
+        // Define the test schema.
+        PhoenixTestBuilder.SchemaBuilder schemaBuilder = null;
+        if (!createChildAfterTransform) {
+            schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+            
schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+                    .withGlobalViewIndexOptions(globalViewIndexOptions)
+                    .withTenantViewOptions(tenantViewOptions)
+                    
.withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+        }  else {
+            schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+            schemaBuilder.withTableOptions(tableOptions).build();
+        }
+
+        PTable table = schemaBuilder.getBaseTable();
+        String schemaName = table.getSchemaName().getString();
+        String tableName = table.getTableName().getString();
+        String newBaseTableName = "NEW_TBL_" + tableName;
+        String fullNewBaseTableName = SchemaUtil.getTableName(schemaName, 
newBaseTableName);
+        String fullTableName = table.getName().getString();
+
+        try (Connection conn = getConnection(props)) {
+
+            try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+                String snapshotName = new 
StringBuilder(fullTableName).append("-Snapshot").toString();
+                admin.snapshot(snapshotName, TableName.valueOf(fullTableName));
+                admin.cloneSnapshot(Bytes.toBytes(snapshotName), 
Bytes.toBytes(fullNewBaseTableName));
+            }
+
+            renameAndDropPhysicalTable(conn, null, schemaName, tableName, 
newBaseTableName);
+        }
+
+        // TODO: this still creates a new table.
+        if (createChildAfterTransform) {
+            schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+            schemaBuilder.withDataOptions(schemaBuilder.getDataOptions())
+                    .withTableOptions(tableOptions)
+                    .withGlobalViewOptions(globalViewOptions)
+                    .withGlobalViewIndexOptions(globalViewIndexOptions)
+                    .withTenantViewOptions(tenantViewOptions)
+                    
.withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+        }
+
+        // Define the test data.
+        PhoenixTestBuilder.DataSupplier dataSupplier = new 
PhoenixTestBuilder.DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String id = String.format(ViewTTLIT.ID_FMT, rowIndex);
+                String zid = String.format(ViewTTLIT.ZID_FMT, rowIndex);
+                String col4 = String.format(ViewTTLIT.COL4_FMT, rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col5 = String.format(ViewTTLIT.COL5_FMT, rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col6 = String.format(ViewTTLIT.COL6_FMT, rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col7 = String.format(ViewTTLIT.COL7_FMT, rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col8 = String.format(ViewTTLIT.COL8_FMT, rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col9 = String.format(ViewTTLIT.COL9_FMT, rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { id, zid, col4, col5, col6, col7, col8, 
col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        PhoenixTestBuilder.DataWriter dataWriter = new 
PhoenixTestBuilder.BasicDataWriter();
+        List<String> columns =
+                Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", 
"COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+
+        String tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions().getTenantId();
+
+        try (Connection tenantConnection = 
DriverManager.getConnection(tenantConnectUrl)) {
+            tenantConnection.setAutoCommit(true);
+            dataWriter.setConnection(tenantConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+            dataWriter.upsertRows(1, numOfRows);
+            com.google.common.collect.Table<String, String, Object> 
upsertedData = dataWriter.getDataTable();;
+
+            PhoenixTestBuilder.DataReader dataReader = new 
PhoenixTestBuilder.BasicDataReader();
+            dataReader.setValidationColumns(columns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT %s from %s", 
Joiner.on(",").join(columns),
+                    schemaBuilder.getEntityTenantViewName()));
+            
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+            dataReader.setConnection(tenantConnection);
+            dataReader.readRows();
+            com.google.common.collect.Table<String, String, Object> fetchedData
+                    = dataReader.getDataTable();
+            assertNotNull("Fetched data should not be null", fetchedData);
+            ViewTTLIT.verifyRowsBeforeTTLExpiration(upsertedData, fetchedData);
+
+        }
+        return schemaBuilder;
+    }
+
+    @Test
+    public void testWith2LevelViewsBaseTablePhysicalNameChange() throws 
Exception {
+        // TODO: use namespace in one of the cases
+        PhoenixTestBuilder.SchemaBuilder schemaBuilder = 
createGlobalViewAndTenantView();
+    }
+
+    @Test
+    public void testUpdatePhysicalTableNameWithViews() throws Exception {
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                String schemaName = "S_" + generateUniqueName();
+                String tableName = "TBL_" + generateUniqueName();
+                String view1Name = "VW1_" + generateUniqueName();
+                String view1IndexName1 = "VW1IDX1_" + generateUniqueName();
+                String view1IndexName2 = "VW1IDX2_" + generateUniqueName();
+                String fullView1IndexName1 = 
SchemaUtil.getTableName(schemaName, view1IndexName1);
+                String fullView1IndexName2 =  
SchemaUtil.getTableName(schemaName, view1IndexName2);
+                String view2Name = "VW2_" + generateUniqueName();
+                String view2IndexName1 = "VW2IDX1_" + generateUniqueName();
+                String fullView1Name = SchemaUtil.getTableName(schemaName, 
view1Name);
+                String fullView2Name = SchemaUtil.getTableName(schemaName, 
view2Name);
+                String fullView2IndexName1 =  
SchemaUtil.getTableName(schemaName, view2IndexName1);
+
+                HashMap<String, ArrayList<String>> expected = 
testWithViewsAndIndex_BaseTableChange(conn, conn2, schemaName, tableName, 
view1Name, view1IndexName1, view1IndexName2, view2Name, view2IndexName1);
+
+                // We have to rebuild index for this to work
+                IndexToolIT.runIndexTool(true, false, schemaName, view1Name, 
view1IndexName1);
+                IndexToolIT.runIndexTool(true, false, schemaName, view1Name, 
view1IndexName2);
+                IndexToolIT.runIndexTool(true, false, schemaName, view2Name, 
view2IndexName1);
+
+                SingleCellIndexIT.dumpTable("_IDX_" + 
SchemaUtil.getTableName(schemaName, tableName));

Review comment:
       please remove dumpTable

##########
File path: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
##########
@@ -0,0 +1,819 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.curator.shaded.com.google.common.base.Joiner;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static java.util.Arrays.asList;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
+import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.MAX_ROWS;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(Parameterized.class)
+@Category(NeedsOwnMiniClusterTest.class)
+public class LogicalTableNameIT extends BaseTest {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogicalTableNameIT.class);
+
+    private final boolean createChildAfterTransform;
+    private final boolean immutable;
+    private String dataTableDdl;
+    public static final String NEW_TABLE_PREFIX = "NEW_TBL_";
+    private Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newConcurrentMap();
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString());
+        props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
Integer.toString(60*60*1000)); // An hour
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    public LogicalTableNameIT(boolean createChildAfterTransform, boolean 
immutable)  {
+        this.createChildAfterTransform = createChildAfterTransform;
+        this.immutable = immutable;
+        StringBuilder optionBuilder = new StringBuilder();
+        if (immutable) {
+            optionBuilder.append(" 
,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, IMMUTABLE_ROWS=true");
+        }
+        this.dataTableDdl = optionBuilder.toString();
+    }
+
+    @Parameterized.Parameters(
+            name = "createChildAfterTransform={0}, immutable={1}")
+    public static synchronized Collection<Object[]> data() {
+        List<Object[]> list = Lists.newArrayListWithExpectedSize(2);
+        boolean[] Booleans = new boolean[] { false, true };
+        for (boolean immutable : Booleans) {
+            for (boolean createAfter : Booleans) {
+                list.add(new Object[] { createAfter, immutable });
+            }
+        }
+
+        return list;
+    }
+
+    private Connection getConnection(Properties props) throws Exception {
+        props.setProperty(QueryServices.DROP_METADATA_ATTRIB, 
Boolean.toString(true));
+        // Force real driver to be used as the test one doesn't handle creating
+        // more than one ConnectionQueryService
+        props.setProperty(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
StringUtil.EMPTY_STRING);
+        // Create new ConnectionQueryServices so that we can set 
DROP_METADATA_ATTRIB
+        String url = QueryUtil.getConnectionUrl(props, config, "PRINCIPAL");
+        return DriverManager.getConnection(url, props);
+    }
+
+    private  HashMap<String, ArrayList<String>> 
testBaseTableWithIndex_BaseTableChange(Connection conn, Connection conn2, 
String schemaName, String tableName, String indexName) throws Exception {
+        conn.setAutoCommit(true);
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        createTable(conn, fullTableName);
+        if (!createChildAfterTransform) {
+            createIndexOnTable(conn, fullTableName, indexName);
+        }
+        HashMap<String, ArrayList<String>> expected = populateTable(conn, 
fullTableName, 1, 2);
+
+        // Create another hbase table and add 1 more row
+        String newTableName =  NEW_TABLE_PREFIX + tableName;
+        String fullNewTableName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+            String snapshotName = new 
StringBuilder(fullTableName).append("-Snapshot").toString();
+            admin.snapshot(snapshotName, TableName.valueOf(fullTableName));
+            admin.cloneSnapshot(Bytes.toBytes(snapshotName), 
Bytes.toBytes(fullNewTableName));
+
+            try (HTableInterface htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullNewTableName)))
 {
+                Put put = new Put(ByteUtil.concat(Bytes.toBytes("PK3")));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES,
+                        QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V1"), Bytes.toBytes("V13"));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V2"),
+                        PInteger.INSTANCE.toBytes(3));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V3"),
+                        PInteger.INSTANCE.toBytes(4));
+                htable.put(put);
+                expected.put("PK3", Lists.newArrayList("PK3", "V13", "3", 
"4"));
+            }
+        }
+
+        // Query to cache on the second connection
+        String selectTable1 = "SELECT PK1, V1, V2, V3 FROM " + fullTableName + 
" ORDER BY PK1 DESC";
+        ResultSet rs1 = conn2.createStatement().executeQuery(selectTable1);
+        assertTrue(rs1.next());
+
+        // Rename table to point to the new hbase table
+        renameAndDropPhysicalTable(conn, "NULL", schemaName, tableName, 
newTableName);
+
+        if (createChildAfterTransform) {
+            createIndexOnTable(conn, fullTableName, indexName);
+        }
+
+        SingleCellIndexIT.dumpTable(fullNewTableName);
+        return expected;
+    }
+
+    @Test
+    public void testUpdatePhysicalTableNameWithIndex() throws Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                HashMap<String, ArrayList<String>> expected = 
testBaseTableWithIndex_BaseTableChange(conn, conn2, schemaName, tableName, 
indexName);
+
+                // We have to rebuild index for this to work
+                IndexToolIT.runIndexTool(true, false, schemaName, tableName, 
indexName);
+
+                validateTable(conn, fullTableName);
+                validateTable(conn2, fullTableName);
+                validateIndex(conn, fullIndexName, false, expected);
+                validateIndex(conn2, fullIndexName, false, expected);
+
+                // Add row and check
+                populateTable(conn, fullTableName, 10, 1);
+                ResultSet rs = conn2.createStatement().executeQuery("SELECT * 
FROM " + fullIndexName + " WHERE \":PK1\"='PK10'");
+                assertEquals(true, rs.next());
+                rs = conn.createStatement().executeQuery("SELECT * FROM " + 
fullTableName  + " WHERE PK1='PK10'");
+                assertEquals(true, rs.next());
+
+                
SingleCellIndexIT.dumpTable(SchemaUtil.getTableName(schemaName, 
NEW_TABLE_PREFIX+tableName));
+                // Drop row and check
+                conn.createStatement().execute("DELETE from " + fullTableName 
+ " WHERE PK1='PK10'");
+                rs = conn2.createStatement().executeQuery("SELECT * FROM " + 
fullIndexName + " WHERE \":PK1\"='PK10'");
+                assertEquals(false, rs.next());
+                rs = conn.createStatement().executeQuery("SELECT * FROM " + 
fullTableName  + " WHERE PK1='PK10'");
+                assertEquals(false, rs.next());
+
+                conn2.createStatement().execute("DROP TABLE " + fullTableName);
+                // check that the physical data table is dropped
+                Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                assertEquals(false, 
admin.tableExists(TableName.valueOf(SchemaUtil.getTableName(schemaName,NEW_TABLE_PREFIX
 + tableName))));
+
+                // check that index is dropped
+                assertEquals(false, 
admin.tableExists(TableName.valueOf(fullIndexName)));
+
+            }
+        }
+    }
+
+    @Test
+    public void testUpdatePhysicalTableNameWithIndex_runScrutiny() throws 
Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                testBaseTableWithIndex_BaseTableChange(conn, conn2, 
schemaName, tableName, indexName);
+
+                
SingleCellIndexIT.dumpTable(SchemaUtil.getTableName(schemaName, indexName));
+                List<Job>
+                        completedJobs =
+                        IndexScrutinyToolBaseIT.runScrutinyTool(schemaName, 
tableName, indexName, 1L,
+                                
IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
+
+                Job job = completedJobs.get(0);
+                assertTrue(job.isSuccessful());
+
+                Counters counters = job.getCounters();
+                if (createChildAfterTransform) {
+                    assertEquals(3, 
counters.findCounter(VALID_ROW_COUNT).getValue());
+                    assertEquals(0, 
counters.findCounter(INVALID_ROW_COUNT).getValue());
+                } else {
+                    // Since we didn't build the index, we expect 1 missing 
index row
+                    assertEquals(2, 
counters.findCounter(VALID_ROW_COUNT).getValue());
+                    assertEquals(1, 
counters.findCounter(INVALID_ROW_COUNT).getValue());
+                }
+            }
+        }
+    }
+
+    private  HashMap<String, ArrayList<String>> 
test_IndexTableChange(Connection conn, Connection conn2, String schemaName, 
String tableName, String indexName, byte[] verifiedBytes) throws Exception {
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        conn.setAutoCommit(true);
+        createTable(conn, fullTableName);
+        createIndexOnTable(conn, fullTableName, indexName);
+        HashMap<String, ArrayList<String>> expected = populateTable(conn, 
fullTableName, 1, 2);
+
+        // Create another hbase table for index and add 1 more row
+        String newTableName = "NEW_IDXTBL_" + generateUniqueName();
+        String fullNewTableName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .getAdmin()) {
+            String snapshotName = new 
StringBuilder(indexName).append("-Snapshot").toString();
+            admin.snapshot(snapshotName, TableName.valueOf(fullIndexName));
+            admin.cloneSnapshot(Bytes.toBytes(snapshotName), 
Bytes.toBytes(fullNewTableName));
+
+            try (HTableInterface htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullNewTableName)))
 {
+                Put
+                        put =
+                        new Put(ByteUtil.concat(Bytes.toBytes("V13"), 
QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes("PK3")));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES,
+                        verifiedBytes);
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("0:V2"),
+                        PInteger.INSTANCE.toBytes(3));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("0:V3"),
+                        PInteger.INSTANCE.toBytes(4));
+                htable.put(put);
+                expected.put("PK3", Lists.newArrayList("PK3", "V13", "3", 
"4"));
+            }
+        }
+
+        // Query to cache on the second connection
+        String selectTable1 = "SELECT * FROM " + fullIndexName;
+        ResultSet rs1 = conn2.createStatement().executeQuery(selectTable1);
+        assertTrue(rs1.next());
+
+        // Rename table to point to the new hbase table
+        renameAndDropPhysicalTable(conn, "NULL", schemaName, indexName, 
newTableName);
+
+        SingleCellIndexIT.dumpTable(fullNewTableName);

Review comment:
       Can remove dumpTable here

##########
File path: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
##########
@@ -0,0 +1,793 @@
+package org.apache.phoenix.end2end;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.*;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.*;
+
+import static java.util.Arrays.asList;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
+import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.MAX_ROWS;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.printResultSet;
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class LogicalTableNameIT extends ParallelStatsDisabledIT  {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogicalTableNameIT.class);
+
+    private final boolean createChildAfterTransform;
+    private final boolean immutable;
+    private String dataTableDdl;
+    public static final String NEW_TABLE_PREFIX = "NEW_TBL_";
+    private Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newConcurrentMap();
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString());
+        props.put(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, 
Integer.toString(3000));
+        //When we run all tests together we are using global cluster(driver)
+        //so to make drop work we need to re register driver with 
DROP_METADATA_ATTRIB property
+        destroyDriver();
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        //Registering real Phoenix driver to have multiple 
ConnectionQueryServices created across connections
+        //so that metadata changes doesn't get propagated across connections
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+    }
+
+    public LogicalTableNameIT(boolean createChildAfterTransform, boolean 
immutable)  {
+        this.createChildAfterTransform = createChildAfterTransform;
+        this.immutable = immutable;
+        StringBuilder optionBuilder = new StringBuilder();
+        if (immutable) {
+            optionBuilder.append(" 
,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, IMMUTABLE_ROWS=true");
+        }
+        this.dataTableDdl = optionBuilder.toString();
+    }
+
+    @Parameterized.Parameters(
+            name = "createChildAfterTransform={0}, immutable={1}")
+    public static synchronized Collection<Object[]> data() {
+        List<Object[]> list = Lists.newArrayListWithExpectedSize(2);
+        boolean[] Booleans = new boolean[] { false, true };
+        for (boolean immutable : Booleans) {
+            for (boolean createAfter : Booleans) {
+                list.add(new Object[] { createAfter, immutable });
+            }
+        }
+
+        return list;
+    }
+
+    private Connection getConnection(Properties props) throws Exception {
+        props.setProperty(QueryServices.DROP_METADATA_ATTRIB, 
Boolean.toString(true));
+        // Force real driver to be used as the test one doesn't handle creating
+        // more than one ConnectionQueryService
+        props.setProperty(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
StringUtil.EMPTY_STRING);
+        // Create new ConnectionQueryServices so that we can set 
DROP_METADATA_ATTRIB
+        String url = QueryUtil.getConnectionUrl(props, config, "PRINCIPAL");
+        return DriverManager.getConnection(url, props);
+    }
+
+    private  HashMap<String, ArrayList<String>> 
testBaseTableWithIndex_BaseTableChange(Connection conn, Connection conn2, 
String schemaName, String tableName, String indexName) throws Exception {
+        conn.setAutoCommit(true);
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        createTable(conn, fullTableName);
+        if (!createChildAfterTransform) {
+            createIndexOnTable(conn, fullTableName, indexName);
+        }
+        HashMap<String, ArrayList<String>> expected = populateTable(conn, 
fullTableName, 1, 2);
+
+        // Create another hbase table and add 1 more row
+        String newTableName =  NEW_TABLE_PREFIX + tableName;
+        String fullNewTableName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+            String snapshotName = new 
StringBuilder(fullTableName).append("-Snapshot").toString();
+            admin.snapshot(snapshotName, TableName.valueOf(fullTableName));
+            admin.cloneSnapshot(Bytes.toBytes(snapshotName), 
Bytes.toBytes(fullNewTableName));
+
+            try (HTableInterface htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullNewTableName)))
 {
+                Put put = new Put(ByteUtil.concat(Bytes.toBytes("PK3")));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES,
+                        QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V1"), Bytes.toBytes("V13"));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V2"),
+                        PInteger.INSTANCE.toBytes(3));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V3"),
+                        PInteger.INSTANCE.toBytes(4));
+                htable.put(put);
+                expected.put("PK3", Lists.newArrayList("PK3", "V13", "3", 
"4"));
+            }
+        }
+
+        // Query to cache on the second connection
+        String selectTable1 = "SELECT PK1, V1, V2, V3 FROM " + fullTableName + 
" ORDER BY PK1 DESC";
+        ResultSet rs1 = conn2.createStatement().executeQuery(selectTable1);
+        assertTrue(rs1.next());
+
+        // Rename table to point to the new hbase table
+        renameAndDropPhysicalTable(conn, "NULL", schemaName, tableName, 
newTableName);
+
+        if (createChildAfterTransform) {
+            createIndexOnTable(conn, fullTableName, indexName);
+        }
+
+        SingleCellIndexIT.dumpTable(fullNewTableName);

Review comment:
       Before merging could you please remove?

##########
File path: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
##########
@@ -0,0 +1,819 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.curator.shaded.com.google.common.base.Joiner;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static java.util.Arrays.asList;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
+import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.MAX_ROWS;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(Parameterized.class)
+@Category(NeedsOwnMiniClusterTest.class)
+public class LogicalTableNameIT extends BaseTest {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogicalTableNameIT.class);
+
+    private final boolean createChildAfterTransform;
+    private final boolean immutable;
+    private String dataTableDdl;
+    public static final String NEW_TABLE_PREFIX = "NEW_TBL_";
+    private Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newConcurrentMap();
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString());
+        props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
Integer.toString(60*60*1000)); // An hour
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    public LogicalTableNameIT(boolean createChildAfterTransform, boolean 
immutable)  {
+        this.createChildAfterTransform = createChildAfterTransform;
+        this.immutable = immutable;
+        StringBuilder optionBuilder = new StringBuilder();
+        if (immutable) {
+            optionBuilder.append(" 
,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, IMMUTABLE_ROWS=true");
+        }
+        this.dataTableDdl = optionBuilder.toString();
+    }
+
+    @Parameterized.Parameters(
+            name = "createChildAfterTransform={0}, immutable={1}")
+    public static synchronized Collection<Object[]> data() {
+        List<Object[]> list = Lists.newArrayListWithExpectedSize(2);
+        boolean[] Booleans = new boolean[] { false, true };
+        for (boolean immutable : Booleans) {
+            for (boolean createAfter : Booleans) {
+                list.add(new Object[] { createAfter, immutable });
+            }
+        }
+
+        return list;
+    }
+
+    private Connection getConnection(Properties props) throws Exception {
+        props.setProperty(QueryServices.DROP_METADATA_ATTRIB, 
Boolean.toString(true));
+        // Force real driver to be used as the test one doesn't handle creating
+        // more than one ConnectionQueryService
+        props.setProperty(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
StringUtil.EMPTY_STRING);
+        // Create new ConnectionQueryServices so that we can set 
DROP_METADATA_ATTRIB
+        String url = QueryUtil.getConnectionUrl(props, config, "PRINCIPAL");
+        return DriverManager.getConnection(url, props);
+    }
+
+    private  HashMap<String, ArrayList<String>> 
testBaseTableWithIndex_BaseTableChange(Connection conn, Connection conn2, 
String schemaName, String tableName, String indexName) throws Exception {
+        conn.setAutoCommit(true);
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        createTable(conn, fullTableName);
+        if (!createChildAfterTransform) {
+            createIndexOnTable(conn, fullTableName, indexName);
+        }
+        HashMap<String, ArrayList<String>> expected = populateTable(conn, 
fullTableName, 1, 2);
+
+        // Create another hbase table and add 1 more row
+        String newTableName =  NEW_TABLE_PREFIX + tableName;
+        String fullNewTableName = SchemaUtil.getTableName(schemaName, 
newTableName);
+        try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+            String snapshotName = new 
StringBuilder(fullTableName).append("-Snapshot").toString();
+            admin.snapshot(snapshotName, TableName.valueOf(fullTableName));
+            admin.cloneSnapshot(Bytes.toBytes(snapshotName), 
Bytes.toBytes(fullNewTableName));
+
+            try (HTableInterface htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullNewTableName)))
 {
+                Put put = new Put(ByteUtil.concat(Bytes.toBytes("PK3")));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES,
+                        QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V1"), Bytes.toBytes("V13"));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V2"),
+                        PInteger.INSTANCE.toBytes(3));
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V3"),
+                        PInteger.INSTANCE.toBytes(4));
+                htable.put(put);
+                expected.put("PK3", Lists.newArrayList("PK3", "V13", "3", 
"4"));
+            }
+        }
+
+        // Query to cache on the second connection
+        String selectTable1 = "SELECT PK1, V1, V2, V3 FROM " + fullTableName + 
" ORDER BY PK1 DESC";
+        ResultSet rs1 = conn2.createStatement().executeQuery(selectTable1);
+        assertTrue(rs1.next());
+
+        // Rename table to point to the new hbase table
+        renameAndDropPhysicalTable(conn, "NULL", schemaName, tableName, 
newTableName);
+
+        if (createChildAfterTransform) {
+            createIndexOnTable(conn, fullTableName, indexName);
+        }
+
+        SingleCellIndexIT.dumpTable(fullNewTableName);
+        return expected;
+    }
+
+    @Test
+    public void testUpdatePhysicalTableNameWithIndex() throws Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                HashMap<String, ArrayList<String>> expected = 
testBaseTableWithIndex_BaseTableChange(conn, conn2, schemaName, tableName, 
indexName);
+
+                // We have to rebuild index for this to work
+                IndexToolIT.runIndexTool(true, false, schemaName, tableName, 
indexName);
+
+                validateTable(conn, fullTableName);
+                validateTable(conn2, fullTableName);
+                validateIndex(conn, fullIndexName, false, expected);
+                validateIndex(conn2, fullIndexName, false, expected);
+
+                // Add row and check
+                populateTable(conn, fullTableName, 10, 1);
+                ResultSet rs = conn2.createStatement().executeQuery("SELECT * 
FROM " + fullIndexName + " WHERE \":PK1\"='PK10'");
+                assertEquals(true, rs.next());
+                rs = conn.createStatement().executeQuery("SELECT * FROM " + 
fullTableName  + " WHERE PK1='PK10'");
+                assertEquals(true, rs.next());
+
+                
SingleCellIndexIT.dumpTable(SchemaUtil.getTableName(schemaName, 
NEW_TABLE_PREFIX+tableName));
+                // Drop row and check
+                conn.createStatement().execute("DELETE from " + fullTableName 
+ " WHERE PK1='PK10'");
+                rs = conn2.createStatement().executeQuery("SELECT * FROM " + 
fullIndexName + " WHERE \":PK1\"='PK10'");
+                assertEquals(false, rs.next());
+                rs = conn.createStatement().executeQuery("SELECT * FROM " + 
fullTableName  + " WHERE PK1='PK10'");
+                assertEquals(false, rs.next());
+
+                conn2.createStatement().execute("DROP TABLE " + fullTableName);
+                // check that the physical data table is dropped
+                Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                assertEquals(false, 
admin.tableExists(TableName.valueOf(SchemaUtil.getTableName(schemaName,NEW_TABLE_PREFIX
 + tableName))));
+
+                // check that index is dropped
+                assertEquals(false, 
admin.tableExists(TableName.valueOf(fullIndexName)));
+
+            }
+        }
+    }
+
+    @Test
+    public void testUpdatePhysicalTableNameWithIndex_runScrutiny() throws 
Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+
+        try (Connection conn = getConnection(props)) {
+            try (Connection conn2 = getConnection(props)) {
+                testBaseTableWithIndex_BaseTableChange(conn, conn2, 
schemaName, tableName, indexName);
+
+                
SingleCellIndexIT.dumpTable(SchemaUtil.getTableName(schemaName, indexName));

Review comment:
       Please remove dumpTable

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
##########
@@ -1877,7 +1923,9 @@ public void createTable(RpcController controller, 
CreateTableRequest request,
                     cPhysicalName = parentTable.getPhysicalName().getBytes();
                     cParentPhysicalName = 
parentTable.getPhysicalName().getBytes();
                 } else if (parentTable.getType() == PTableType.VIEW) {
-                    cPhysicalName = 
MetaDataUtil.getViewIndexPhysicalName(parentTable.getPhysicalName().getBytes());
+                    // Logical name of base table

Review comment:
       nit: let's specify in the comment that the physical name of the view 
index table is constructed from the logical name of the base table, so it's 
more clear what's going on here. (similar to the good comment on 2206)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


> Change SYSTEM.CATALOG to allow separation of physical name (Hbase name) from 
> logical name (Phoenix name)
> --------------------------------------------------------------------------------------------------------
>
>                 Key: PHOENIX-6247
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6247
>             Project: Phoenix
>          Issue Type: Improvement
>            Reporter: Gokcen Iskender
>            Assignee: Gokcen Iskender
>            Priority: Major
>
> Currently, the tables in Phoenix have the same name as the underlying Hbase 
> table. Separating logical and physical table name, ie. Having a Phoenix table 
> point to an Hbase table with a different name have some advantages. 
> An example is this: Let's say we want to have a different storage/encoding 
> scheme for an index. We can build the new index while the clients use the old 
> index and once the index is rebuilt, we can momentarily start pointing to the 
> new index table without much downtime or performance implications. For the 
> client, they are using the same index with the same name, but the physical 
> table is different. Today, in order to change the index like this, we have to 
> drop it and re-create which is a downtime for the index and the data table 
> full scans are used for queries impacting performance while the index 
> creation goes on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to