shahrs87 commented on code in PR #1726:
URL: https://github.com/apache/phoenix/pull/1726#discussion_r1384016449


##########
phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java:
##########
@@ -243,6 +247,12 @@ public MutationState(TableRef table, MultiRowMutationState 
mutations, long sizeO
         throwIfTooBig();
     }
 
+    private boolean getValidateLastDdlTimestampEnabled() {

Review Comment:
   Do you want to move this method to the util class that you created? 



##########
phoenix-core/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java:
##########
@@ -0,0 +1,170 @@
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.exception.StaleMetadataCacheException;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.TableRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Utility class for last ddl timestamp validation from the client.
+ */
+public class ValidateLastDDLTimestampUtil {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(ValidateLastDDLTimestampUtil.class);
+
+    public static String getInfoString(PName tenantId, List<TableRef> 
tableRefs) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(String.format("Tenant: %s, ", tenantId));
+        for (TableRef tableRef : tableRefs) {
+            sb.append(String.format("{Schema: %s, Table: %s},",
+                    tableRef.getTable().getSchemaName(),
+                    tableRef.getTable().getTableName()));
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Verifies that table metadata for given tables is up-to-date in client 
cache with server.
+     * A random live region server is picked for invoking the RPC to validate 
LastDDLTimestamp.
+     * Retry once if there was an error performing the RPC, otherwise throw 
the Exception.
+     * @param tableRefs
+     * @throws SQLException
+     */
+    public static void validateLastDDLTimestamp(
+            PhoenixConnection conn, List<TableRef> tableRefs, boolean 
isWritePath, boolean doRetry)
+            throws SQLException {
+
+        String infoString = getInfoString(conn.getTenantId(), tableRefs);
+        try (Admin admin = conn.getQueryServices().getAdmin()) {
+            // get all live region servers
+            List<ServerName> regionServers
+                    = conn.getQueryServices().getLiveRegionServers();
+            // pick one at random
+            ServerName regionServer
+                    = 
regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size()));
+
+            LOGGER.debug("Sending DDL timestamp validation request for {} to 
regionserver {}",
+                    infoString, regionServer);
+
+            // RPC
+            CoprocessorRpcChannel channel = 
admin.coprocessorService(regionServer);
+            PhoenixRegionServerEndpoint.BlockingInterface service
+                    = PhoenixRegionServerEndpoint.newBlockingStub(channel);
+            RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request
+                    = getValidateDDLTimestampRequest(conn, tableRefs, 
isWritePath);
+            service.validateLastDDLTimestamp(null, request);
+        } catch (Exception e) {
+            SQLException parsedException = ServerUtil.parseServerException(e);
+            if (parsedException instanceof StaleMetadataCacheException) {
+                throw parsedException;
+            }
+            //retry once for any exceptions other than 
StaleMetadataCacheException
+            LOGGER.error("Error in validating DDL timestamp for {}", 
infoString, parsedException);
+            if (doRetry) {
+                // update the list of live region servers
+                conn.getQueryServices().refreshLiveRegionServers();
+                validateLastDDLTimestamp(conn, tableRefs, isWritePath, false);
+                return;
+            }
+            throw parsedException;
+        }
+    }
+
+    /**
+     * Build a request for the validateLastDDLTimestamp RPC for the given 
tables.
+     * 1. For a view, we need to add all its ancestors to the request in case 
something changed in the hierarchy.
+     * 2. For an index, we need to add its parent table to the request in case 
the index was dropped.
+     * 3. On the write path, we need to add all indexes of a table/view in 
case index state was changed.
+     * @param tableRefs
+     * @return ValidateLastDDLTimestampRequest for the table in tableRef
+     */
+    private static RegionServerEndpointProtos.ValidateLastDDLTimestampRequest
+    getValidateDDLTimestampRequest(PhoenixConnection conn, List<TableRef> 
tableRefs,
+                                        boolean isWritePath) throws 
TableNotFoundException {
+
+        RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.Builder 
requestBuilder
+                = 
RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.newBuilder();
+        RegionServerEndpointProtos.LastDDLTimestampRequest.Builder 
innerBuilder;
+
+        for (TableRef tableRef : tableRefs) {
+             innerBuilder = 
RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();

Review Comment:
   Do you want to move this initialization of innerBuilder inside the if 
condition on line#112? 



##########
phoenix-core/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java:
##########
@@ -0,0 +1,170 @@
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.exception.StaleMetadataCacheException;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.TableRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Utility class for last ddl timestamp validation from the client.
+ */
+public class ValidateLastDDLTimestampUtil {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(ValidateLastDDLTimestampUtil.class);
+
+    public static String getInfoString(PName tenantId, List<TableRef> 
tableRefs) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(String.format("Tenant: %s, ", tenantId));
+        for (TableRef tableRef : tableRefs) {
+            sb.append(String.format("{Schema: %s, Table: %s},",
+                    tableRef.getTable().getSchemaName(),
+                    tableRef.getTable().getTableName()));
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Verifies that table metadata for given tables is up-to-date in client 
cache with server.
+     * A random live region server is picked for invoking the RPC to validate 
LastDDLTimestamp.
+     * Retry once if there was an error performing the RPC, otherwise throw 
the Exception.
+     * @param tableRefs
+     * @throws SQLException
+     */
+    public static void validateLastDDLTimestamp(
+            PhoenixConnection conn, List<TableRef> tableRefs, boolean 
isWritePath, boolean doRetry)
+            throws SQLException {
+
+        String infoString = getInfoString(conn.getTenantId(), tableRefs);
+        try (Admin admin = conn.getQueryServices().getAdmin()) {
+            // get all live region servers
+            List<ServerName> regionServers
+                    = conn.getQueryServices().getLiveRegionServers();
+            // pick one at random
+            ServerName regionServer
+                    = 
regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size()));
+
+            LOGGER.debug("Sending DDL timestamp validation request for {} to 
regionserver {}",
+                    infoString, regionServer);
+
+            // RPC
+            CoprocessorRpcChannel channel = 
admin.coprocessorService(regionServer);
+            PhoenixRegionServerEndpoint.BlockingInterface service
+                    = PhoenixRegionServerEndpoint.newBlockingStub(channel);
+            RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request
+                    = getValidateDDLTimestampRequest(conn, tableRefs, 
isWritePath);
+            service.validateLastDDLTimestamp(null, request);
+        } catch (Exception e) {
+            SQLException parsedException = ServerUtil.parseServerException(e);
+            if (parsedException instanceof StaleMetadataCacheException) {
+                throw parsedException;
+            }
+            //retry once for any exceptions other than 
StaleMetadataCacheException
+            LOGGER.error("Error in validating DDL timestamp for {}", 
infoString, parsedException);
+            if (doRetry) {
+                // update the list of live region servers
+                conn.getQueryServices().refreshLiveRegionServers();
+                validateLastDDLTimestamp(conn, tableRefs, isWritePath, false);
+                return;
+            }
+            throw parsedException;
+        }
+    }
+
+    /**
+     * Build a request for the validateLastDDLTimestamp RPC for the given 
tables.
+     * 1. For a view, we need to add all its ancestors to the request in case 
something changed in the hierarchy.
+     * 2. For an index, we need to add its parent table to the request in case 
the index was dropped.
+     * 3. On the write path, we need to add all indexes of a table/view in 
case index state was changed.
+     * @param tableRefs
+     * @return ValidateLastDDLTimestampRequest for the table in tableRef
+     */
+    private static RegionServerEndpointProtos.ValidateLastDDLTimestampRequest
+    getValidateDDLTimestampRequest(PhoenixConnection conn, List<TableRef> 
tableRefs,
+                                        boolean isWritePath) throws 
TableNotFoundException {
+
+        RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.Builder 
requestBuilder
+                = 
RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.newBuilder();
+        RegionServerEndpointProtos.LastDDLTimestampRequest.Builder 
innerBuilder;
+
+        for (TableRef tableRef : tableRefs) {
+             innerBuilder = 
RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+
+            //when querying an index, we need to validate its parent table
+            //in case the index was dropped
+            if (PTableType.INDEX.equals(tableRef.getTable().getType())) {
+                PTableKey key = new PTableKey(conn.getTenantId(),
+                        tableRef.getTable().getParentName().getString());
+                PTable parentTable = conn.getTable(key);
+                setLastDDLTimestampRequestParameters(innerBuilder, 
conn.getTenantId(), parentTable);
+                requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+            }
+
+            // add the tableRef to the request
+            innerBuilder = 
RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+            setLastDDLTimestampRequestParameters(innerBuilder, 
conn.getTenantId(), tableRef.getTable());
+            requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+
+            //when querying a view, we need to validate last ddl timestamps 
for all its ancestors
+            if (PTableType.VIEW.equals(tableRef.getTable().getType())) {
+                PTable pTable = tableRef.getTable();
+                while (pTable.getParentName() != null) {
+                    PTableKey key = new PTableKey(conn.getTenantId(),
+                            pTable.getParentName().getString());
+                    PTable parentTable = conn.getTable(key);
+                    innerBuilder = 
RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+                    setLastDDLTimestampRequestParameters(innerBuilder, 
conn.getTenantId(), parentTable);
+                    requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+                    pTable = parentTable;
+                }
+            }
+
+            //on the write path, we need to validate all indexes of a 
table/view
+            //in case index state was changed
+            if (isWritePath) {
+                for (PTable idxPTable : tableRef.getTable().getIndexes()) {
+                    innerBuilder = 
RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+                    setLastDDLTimestampRequestParameters(innerBuilder, 
conn.getTenantId(), idxPTable);
+                    requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+                }
+            }
+        }
+

Review Comment:
   nit: extra line.



##########
phoenix-core/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java:
##########
@@ -0,0 +1,170 @@
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.exception.StaleMetadataCacheException;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.TableRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Utility class for last ddl timestamp validation from the client.
+ */
+public class ValidateLastDDLTimestampUtil {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(ValidateLastDDLTimestampUtil.class);
+
+    public static String getInfoString(PName tenantId, List<TableRef> 
tableRefs) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(String.format("Tenant: %s, ", tenantId));
+        for (TableRef tableRef : tableRefs) {
+            sb.append(String.format("{Schema: %s, Table: %s},",
+                    tableRef.getTable().getSchemaName(),
+                    tableRef.getTable().getTableName()));
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Verifies that table metadata for given tables is up-to-date in client 
cache with server.
+     * A random live region server is picked for invoking the RPC to validate 
LastDDLTimestamp.
+     * Retry once if there was an error performing the RPC, otherwise throw 
the Exception.
+     * @param tableRefs
+     * @throws SQLException
+     */
+    public static void validateLastDDLTimestamp(
+            PhoenixConnection conn, List<TableRef> tableRefs, boolean 
isWritePath, boolean doRetry)
+            throws SQLException {
+
+        String infoString = getInfoString(conn.getTenantId(), tableRefs);
+        try (Admin admin = conn.getQueryServices().getAdmin()) {
+            // get all live region servers
+            List<ServerName> regionServers
+                    = conn.getQueryServices().getLiveRegionServers();
+            // pick one at random
+            ServerName regionServer
+                    = 
regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size()));
+
+            LOGGER.debug("Sending DDL timestamp validation request for {} to 
regionserver {}",
+                    infoString, regionServer);
+
+            // RPC
+            CoprocessorRpcChannel channel = 
admin.coprocessorService(regionServer);
+            PhoenixRegionServerEndpoint.BlockingInterface service
+                    = PhoenixRegionServerEndpoint.newBlockingStub(channel);
+            RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request
+                    = getValidateDDLTimestampRequest(conn, tableRefs, 
isWritePath);
+            service.validateLastDDLTimestamp(null, request);
+        } catch (Exception e) {
+            SQLException parsedException = ServerUtil.parseServerException(e);
+            if (parsedException instanceof StaleMetadataCacheException) {
+                throw parsedException;
+            }
+            //retry once for any exceptions other than 
StaleMetadataCacheException
+            LOGGER.error("Error in validating DDL timestamp for {}", 
infoString, parsedException);
+            if (doRetry) {
+                // update the list of live region servers
+                conn.getQueryServices().refreshLiveRegionServers();
+                validateLastDDLTimestamp(conn, tableRefs, isWritePath, false);
+                return;
+            }
+            throw parsedException;
+        }
+    }
+
+    /**
+     * Build a request for the validateLastDDLTimestamp RPC for the given 
tables.
+     * 1. For a view, we need to add all its ancestors to the request in case 
something changed in the hierarchy.
+     * 2. For an index, we need to add its parent table to the request in case 
the index was dropped.
+     * 3. On the write path, we need to add all indexes of a table/view in 
case index state was changed.
+     * @param tableRefs
+     * @return ValidateLastDDLTimestampRequest for the table in tableRef
+     */
+    private static RegionServerEndpointProtos.ValidateLastDDLTimestampRequest
+    getValidateDDLTimestampRequest(PhoenixConnection conn, List<TableRef> 
tableRefs,
+                                        boolean isWritePath) throws 
TableNotFoundException {

Review Comment:
   Add isWritePath to the method's javadoc.



##########
phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java:
##########
@@ -911,7 +913,330 @@ public void testSelectQueryDropIndex() throws Exception {
         }
     }
 
+    /**
+     * Test the case when a client upserts into multiple tables before calling 
commit.
+     * Verify that last ddl timestamp was validated for all involved tables 
only once.
+     */
+    @Test
+    public void testUpsertMultipleTablesWithOldDDLTimestamp() throws Exception 
{
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName1 = generateUniqueName();
+        String tableName2 = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates 2 tables
+            createTable(conn1, tableName1, NEVER);
+            createTable(conn1, tableName2, NEVER);
+
+            //client-2 populates its cache, 1 getTable call for each table
+            query(conn2, tableName1);
+            query(conn2, tableName2);
+
+            //client-1 alters one of the tables
+            alterTableAddColumn(conn1, tableName2, "v3");
+
+            //client-2 upserts multiple rows to both tables before calling 
commit
+            //verify the table metadata was fetched for each table
+            multiTableUpsert(conn2, tableName1, tableName2);
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName1)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName2)),
+                    anyLong(), anyLong());
+        }
+    }
+
+    /**
+     * Test upserts into a multi-level view hierarchy.
+     */
+    @Test
+    public void testUpsertViewWithOldDDLTimestamp() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates a table and views
+            createTable(conn1, tableName, NEVER);
+            createView(conn1, tableName, viewName1);
+            createView(conn1, viewName1, viewName2);
+
+            //client-2 populates its cache, 1 getTable RPC each for table, 
view1, view2
+            query(conn2, viewName2);
+
+            //client-1 alters first level view
+            alterViewAddColumn(conn1, viewName1, "v3");
+
+            //client-2 upserts into second level view
+            //verify there was a getTable RPC for the view and all its 
ancestors
+            upsert(conn2, viewName2, true);
+
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(viewName1)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(viewName2)),
+                    anyLong(), anyLong());
+
+            //client-2 upserts into first level view
+            //verify no getTable RPCs
+            upsert(conn2, viewName1, true);
+
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(viewName1)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(viewName2)),
+                    anyLong(), anyLong());
+        }
+    }
+
+    /**
+     * Test that upserts into a table which was dropped throws a 
TableNotFoundException.
+     */
+    @Test
+    public void testUpsertDroppedTable() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // client-1 creates tables and executes upserts
+            createTable(conn1, tableName, NEVER);
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+
+            // client-2 drops the table
+            conn2.createStatement().execute("DROP TABLE " + tableName);
+
+            //client-1 commits
+            conn1.commit();
+            Assert.fail("Commit should have failed with 
TableNotFoundException");
+        }
+        catch (Exception e) {
+            Assert.assertTrue("TableNotFoundException was not thrown when 
table was dropped concurrently with upserts.", e instanceof 
TableNotFoundException);

Review Comment:
   nit: line length warning.



##########
phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java:
##########
@@ -911,7 +913,330 @@ public void testSelectQueryDropIndex() throws Exception {
         }
     }
 
+    /**
+     * Test the case when a client upserts into multiple tables before calling 
commit.
+     * Verify that last ddl timestamp was validated for all involved tables 
only once.
+     */
+    @Test
+    public void testUpsertMultipleTablesWithOldDDLTimestamp() throws Exception 
{
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName1 = generateUniqueName();
+        String tableName2 = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates 2 tables
+            createTable(conn1, tableName1, NEVER);
+            createTable(conn1, tableName2, NEVER);
+
+            //client-2 populates its cache, 1 getTable call for each table
+            query(conn2, tableName1);
+            query(conn2, tableName2);
+
+            //client-1 alters one of the tables
+            alterTableAddColumn(conn1, tableName2, "v3");
+
+            //client-2 upserts multiple rows to both tables before calling 
commit
+            //verify the table metadata was fetched for each table
+            multiTableUpsert(conn2, tableName1, tableName2);
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName1)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName2)),
+                    anyLong(), anyLong());
+        }
+    }
+
+    /**
+     * Test upserts into a multi-level view hierarchy.
+     */
+    @Test
+    public void testUpsertViewWithOldDDLTimestamp() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates a table and views
+            createTable(conn1, tableName, NEVER);
+            createView(conn1, tableName, viewName1);
+            createView(conn1, viewName1, viewName2);
+
+            //client-2 populates its cache, 1 getTable RPC each for table, 
view1, view2
+            query(conn2, viewName2);

Review Comment:
   Can we verify that getTable call was made for tableName, viewName1 and 
viewName2?



##########
phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java:
##########
@@ -911,7 +913,330 @@ public void testSelectQueryDropIndex() throws Exception {
         }
     }
 
+    /**
+     * Test the case when a client upserts into multiple tables before calling 
commit.
+     * Verify that last ddl timestamp was validated for all involved tables 
only once.
+     */
+    @Test
+    public void testUpsertMultipleTablesWithOldDDLTimestamp() throws Exception 
{
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName1 = generateUniqueName();
+        String tableName2 = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates 2 tables
+            createTable(conn1, tableName1, NEVER);
+            createTable(conn1, tableName2, NEVER);
+
+            //client-2 populates its cache, 1 getTable call for each table
+            query(conn2, tableName1);
+            query(conn2, tableName2);
+
+            //client-1 alters one of the tables
+            alterTableAddColumn(conn1, tableName2, "v3");
+
+            //client-2 upserts multiple rows to both tables before calling 
commit
+            //verify the table metadata was fetched for each table
+            multiTableUpsert(conn2, tableName1, tableName2);
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName1)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName2)),
+                    anyLong(), anyLong());
+        }
+    }
+
+    /**
+     * Test upserts into a multi-level view hierarchy.
+     */
+    @Test
+    public void testUpsertViewWithOldDDLTimestamp() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates a table and views
+            createTable(conn1, tableName, NEVER);
+            createView(conn1, tableName, viewName1);
+            createView(conn1, viewName1, viewName2);
+
+            //client-2 populates its cache, 1 getTable RPC each for table, 
view1, view2
+            query(conn2, viewName2);
+
+            //client-1 alters first level view
+            alterViewAddColumn(conn1, viewName1, "v3");

Review Comment:
   Can we rename `v3` to `col3`? It gets confusing if it is a view name or 
column name.



##########
phoenix-core/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java:
##########
@@ -0,0 +1,170 @@
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.exception.StaleMetadataCacheException;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.TableRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Utility class for last ddl timestamp validation from the client.
+ */
+public class ValidateLastDDLTimestampUtil {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(ValidateLastDDLTimestampUtil.class);
+
+    public static String getInfoString(PName tenantId, List<TableRef> 
tableRefs) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(String.format("Tenant: %s, ", tenantId));
+        for (TableRef tableRef : tableRefs) {
+            sb.append(String.format("{Schema: %s, Table: %s},",
+                    tableRef.getTable().getSchemaName(),
+                    tableRef.getTable().getTableName()));
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Verifies that table metadata for given tables is up-to-date in client 
cache with server.
+     * A random live region server is picked for invoking the RPC to validate 
LastDDLTimestamp.
+     * Retry once if there was an error performing the RPC, otherwise throw 
the Exception.
+     * @param tableRefs
+     * @throws SQLException

Review Comment:
   There are 2 more arguments in the method: `isWritePath` and `doRetry`. 
Please add them to the javadoc.



##########
phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java:
##########
@@ -911,7 +913,330 @@ public void testSelectQueryDropIndex() throws Exception {
         }
     }
 
+    /**
+     * Test the case when a client upserts into multiple tables before calling 
commit.
+     * Verify that last ddl timestamp was validated for all involved tables 
only once.
+     */
+    @Test
+    public void testUpsertMultipleTablesWithOldDDLTimestamp() throws Exception 
{
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName1 = generateUniqueName();
+        String tableName2 = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates 2 tables
+            createTable(conn1, tableName1, NEVER);
+            createTable(conn1, tableName2, NEVER);
+
+            //client-2 populates its cache, 1 getTable call for each table
+            query(conn2, tableName1);
+            query(conn2, tableName2);
+
+            //client-1 alters one of the tables
+            alterTableAddColumn(conn1, tableName2, "v3");
+
+            //client-2 upserts multiple rows to both tables before calling 
commit
+            //verify the table metadata was fetched for each table
+            multiTableUpsert(conn2, tableName1, tableName2);
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),

Review Comment:
   How can we verify that `StaleMetadataCacheException` was thrown for 
`tableName2` ? 
   Can we introduce a counter metric whenever we receive 
StaleMetadataCacheException? 



##########
phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java:
##########
@@ -911,7 +913,330 @@ public void testSelectQueryDropIndex() throws Exception {
         }
     }
 
+    /**
+     * Test the case when a client upserts into multiple tables before calling 
commit.
+     * Verify that last ddl timestamp was validated for all involved tables 
only once.
+     */
+    @Test
+    public void testUpsertMultipleTablesWithOldDDLTimestamp() throws Exception 
{
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName1 = generateUniqueName();
+        String tableName2 = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates 2 tables
+            createTable(conn1, tableName1, NEVER);
+            createTable(conn1, tableName2, NEVER);
+
+            //client-2 populates its cache, 1 getTable call for each table
+            query(conn2, tableName1);
+            query(conn2, tableName2);
+
+            //client-1 alters one of the tables
+            alterTableAddColumn(conn1, tableName2, "v3");
+
+            //client-2 upserts multiple rows to both tables before calling 
commit
+            //verify the table metadata was fetched for each table
+            multiTableUpsert(conn2, tableName1, tableName2);
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName1)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName2)),
+                    anyLong(), anyLong());
+        }
+    }
+
+    /**
+     * Test upserts into a multi-level view hierarchy.
+     */
+    @Test
+    public void testUpsertViewWithOldDDLTimestamp() throws Exception {

Review Comment:
   Can we add a test case where we add/drop view in the view hierarchy? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to