shahrs87 commented on code in PR #1666:
URL: https://github.com/apache/phoenix/pull/1666#discussion_r1327524169


##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java:
##########
@@ -421,7 +514,20 @@ private PhoenixResultSet executeQuery(final 
CompilableStatement stmt,
                                     }
                                 }
                                 throw e;
-                            } catch (RuntimeException e) {
+                            } catch (SQLException e) {
+                                // force update cache if 
StaleMetadataCacheException and retry
+                                if (e instanceof StaleMetadataCacheException) {
+                                    String planSchemaName = 
getLastQueryPlan().getTableRef().getTable().getSchemaName().toString();
+                                    String planTableName = 
getLastQueryPlan().getTableRef().getTable().getTableName().toString();
+                                    // update cache
+                                    new 
MetaDataClient(connection).updateCache(connection.getTenantId(), 
planSchemaName, planTableName, true);
+                                    // skip last ddl timestamp validation in 
the retry
+                                    setValidateLastDdlTimestamp(false);

Review Comment:
   Would this create a problem? If we are re-using the same statement for 
different queries then once we get into this condition then the value of 
`validateLastDdlTimestamp` will aways be false for subsequent queries.



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java:
##########
@@ -325,6 +337,80 @@ protected PhoenixResultSet executeQuery(final 
CompilableStatement stmt, final Qu
         return executeQuery(stmt, true, queryLogger, noCommit);
     }
 
+    private String getInfoString(TableRef tableRef) {
+        return String.format("Tenant: %s, Schema: %s, Table: %s",
+                this.connection.getTenantId(),
+                tableRef.getTable().getSchemaName(),
+                tableRef.getTable().getTableName());
+    }
+    /**
+     * Build a request for the validateLastDDLTimestamp RPC.
+     * @param tableRef
+     * @return ValidateLastDDLTimestampRequest for the table in tableRef
+     */
+    private RegionServerEndpointProtos.ValidateLastDDLTimestampRequest 
getValidateDDLTimestampRequest(TableRef tableRef) {
+        RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.Builder 
requestBuilder
+                = 
RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.newBuilder();
+        RegionServerEndpointProtos.LastDDLTimestampRequest.Builder innerBuilder
+                = 
RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+
+        byte[] tenantIDBytes = this.connection.getTenantId() == null
+                                    ? HConstants.EMPTY_BYTE_ARRAY
+                                    : this.connection.getTenantId().getBytes();
+        byte[] schemaBytes = tableRef.getTable().getSchemaName() == null
+                                    ? HConstants.EMPTY_BYTE_ARRAY
+                                    : 
tableRef.getTable().getSchemaName().getBytes();
+
+        innerBuilder.setTenantId(ByteStringer.wrap(tenantIDBytes));
+        innerBuilder.setSchemaName(ByteStringer.wrap(schemaBytes));
+        
innerBuilder.setTableName(ByteStringer.wrap(tableRef.getTable().getTableName().getBytes()));
+        
innerBuilder.setLastDDLTimestamp(tableRef.getTable().getLastDDLTimestamp());
+        requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+        return  requestBuilder.build();
+    }
+
+    /**
+     * Verifies that table metadata in client cache is up-to-date with server.
+     * A random live region server is picked for invoking the RPC to validate 
LastDDLTimestamp.
+     * Retry once if there was an error performing the RPC, otherwise throw 
the Exception.
+     * @param tableRef
+     * @throws SQLException
+     */
+    private void validateLastDDLTimestamp(TableRef tableRef, boolean doRetry) 
throws SQLException {
+
+        String infoString = getInfoString(tableRef);
+        try (Admin admin = this.connection.getQueryServices().getAdmin()) {
+            // get all live region servers
+            List<ServerName> regionServers = new 
ArrayList<>(admin.getRegionServers(true));
+            // pick one at random
+            ServerName regionServer
+                    = 
regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size()));
+
+            LOGGER.debug("Sending DDL timestamp validation request for {} to 
regionserver {}",
+                    infoString, regionServer.toString());
+
+            // RPC
+            CoprocessorRpcChannel channel = 
admin.coprocessorService(regionServer);
+            PhoenixRegionServerEndpoint.BlockingInterface service
+                    = PhoenixRegionServerEndpoint.newBlockingStub(channel);
+            service.validateLastDDLTimestamp(null, 
getValidateDDLTimestampRequest(tableRef));
+        } catch (Exception e) {
+            SQLException parsedException = ServerUtil.parseServerException(e);
+            if (parsedException instanceof StaleMetadataCacheException) {
+                throw parsedException;
+            }
+            //retry once for any exceptions other than 
StaleMetadataCacheException
+            LOGGER.error("Error in validating DDL timestamp for {}: {}",

Review Comment:
   ```suggestion
               LOGGER.error("Error in validating DDL timestamp for {}",
                      infoString, parsedException);
   ```



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java:
##########
@@ -421,7 +514,20 @@ private PhoenixResultSet executeQuery(final 
CompilableStatement stmt,
                                     }
                                 }
                                 throw e;
-                            } catch (RuntimeException e) {
+                            } catch (SQLException e) {

Review Comment:
   Instead of catching SQLException, can we catch StaleMetadataCacheException? 



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCachingIT.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.ServerMetadataCache;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static 
org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+
+@Category({NeedsOwnMiniClusterTest.class })
+public class ServerMetadataCachingIT extends BaseTest {
+
+    private final Random RANDOM = new Random(42);

Review Comment:
   why luck number 42?



##########
phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java:
##########
@@ -313,6 +313,9 @@ public interface QueryServices extends SQLCloseable {
     //Update Cache Frequency default config attribute
     public static final String DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB  = 
"phoenix.default.update.cache.frequency";
 
+    // whether to validate last ddl timestamps during client operations
+    public static final String LAST_DDL_TIMESTAMP_VALIDATION_ENABLED = 
"phoenix.ddl.timestamp.validation.enabled";

Review Comment:
   Then you would need to add this to BaseTest?
   ```
           props.put(REGIONSERVER_COPROCESSOR_CONF_KEY,
                   PhoenixRegionServerEndpoint.class.getName());
   ```



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCachingIT.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.ServerMetadataCache;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static 
org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+
+@Category({NeedsOwnMiniClusterTest.class })
+public class ServerMetadataCachingIT extends BaseTest {
+
+    private final Random RANDOM = new Random(42);
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(REGIONSERVER_COPROCESSOR_CONF_KEY,
+                PhoenixRegionServerEndpoint.class.getName());
+        props.put(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, 
Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    private void createTable(Connection conn, String tableName, long 
updateCacheFrequency) throws SQLException {
+        conn.createStatement().execute("CREATE TABLE " + tableName
+                + "(k INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2 INTEGER)"
+                + (updateCacheFrequency == 0 ? "" : 
"UPDATE_CACHE_FREQUENCY="+updateCacheFrequency));
+    }
+
+    private void upsert(Connection conn, String tableName) throws SQLException 
{
+        conn.createStatement().execute("UPSERT INTO " + tableName +
+                " (k, v1, v2) VALUES ("+  RANDOM.nextInt() +", " + 
RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+        conn.commit();
+    }
+
+    private void query(Connection conn, String tableName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) 
FROM " + tableName);
+        rs.next();
+    }
+
+    private void alterTableAddColumn(Connection conn, String tableName, String 
columnName) throws SQLException {
+        conn.createStatement().execute("ALTER TABLE " + tableName + " ADD IF 
NOT EXISTS "
+                + columnName + " INTEGER");
+    }
+
+    /**
+     * Client-1 creates a table, upserts data and alters the table.
+     * Client-2 queries the table before and after the alter.
+     * Check queries work successfully in both cases and verify number of 
getTable RPCs.
+     */
+    @Test
+    public void testSelectQueryWithOldDDLTimestamp() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+        int expectedNumGetTableRPCs;
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // create table with UCF=never and upsert data using client-1
+            createTable(conn1, tableName, Long.MAX_VALUE);
+            upsert(conn1, tableName);
+
+            // select query from client-2 works to populate client side 
metadata cache
+            // there should be 1 getTable RPC
+            query(conn2, tableName);
+            expectedNumGetTableRPCs = 1;
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumGetTableRPCs))
+                    .getTable((PName) isNull(),
+                            any(), eq(PVarchar.INSTANCE.toBytes(tableName)),
+                            anyLong(), anyLong());
+
+            // add column using client-1 to update last ddl timestamp
+            alterTableAddColumn(conn1, tableName, "newCol1");
+
+            // invalidate region server cache
+            ServerMetadataCache.resetCache();
+
+            // select query from client-2 with old ddl timestamp works
+            // there should be one more getTable RPC
+            query(conn2, tableName);
+            expectedNumGetTableRPCs += 1;
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumGetTableRPCs))
+                    .getTable((PName) isNull(),
+                            any(), eq(PVarchar.INSTANCE.toBytes(tableName)),
+                            anyLong(), anyLong());
+
+            // select query from client-2 with latest ddl timestamp works
+            // there should be no more getTable RPCs
+            query(conn2, tableName);
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumGetTableRPCs))
+                    .getTable((PName) isNull(),
+                            any(), eq(PVarchar.INSTANCE.toBytes(tableName)),
+                            anyLong(), anyLong());
+        }
+    }
+
+    /**
+     * Test DDL timestamp validation retry logic in case of SQLException from 
Admin API.
+     */
+    @Test
+    public void testSelectQueryAdminSQLExceptionInValidation() throws 
SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // create table and upsert using client-1
+            createTable(conn1, tableName, Long.MAX_VALUE);
+            upsert(conn1, tableName);
+
+            // instrument CQSI to throw a SQLException once when getAdmin is 
called
+            Mockito.doThrow(new 
SQLException()).doCallRealMethod().when(spyCqs2).getAdmin();
+
+            // query using client-2 should succeed
+            query(conn2, tableName);
+        }
+    }
+
+    /**
+     * Test DDL timestamp validation retry logic in case of IOException from 
Admin API.
+     */
+    @Test
+    public void testSelectQueryAdminIOExceptionInValidation() throws Exception 
{
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // create table and upsert using client-1
+            createTable(conn1, tableName, Long.MAX_VALUE);
+            upsert(conn1, tableName);
+
+            // instrument CQSI admin to throw an IOException once when 
getRegionServers() is called
+            Admin spyAdmin = Mockito.spy(spyCqs2.getAdmin());
+            Mockito.doThrow(new 
IOException()).doCallRealMethod().when(spyAdmin).getRegionServers(eq(true));
+            Mockito.doReturn(spyAdmin).when(spyCqs2).getAdmin();
+
+            // query using client-2 should succeed
+            query(conn2, tableName);
+        }
+    }
+
+    /**
+     * Test DDL timestamp validation retry logic in case of any exception
+     * from Server other than StaleMetadataCacheException.
+     */
+    @Test
+    public void testSelectQueryServerSideExceptionInValidation() throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // create table and upsert using client-1
+            createTable(conn1, tableName, Long.MAX_VALUE);
+            upsert(conn1, tableName);
+
+            // Instrument ServerMetadataCache to throw a SQLException once
+            ServerMetadataCache spyCache = 
Mockito.spy(ServerMetadataCache.getInstance(config));
+            Mockito.doThrow(new 
SQLException()).doCallRealMethod().when(spyCache)
+                    .getLastDDLTimestampForTable(any(), any(), 
eq(Bytes.toBytes(tableName)));
+            ServerMetadataCache.setInstance(spyCache);

Review Comment:
   Are we resetting the cache? 



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCachingIT.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.ServerMetadataCache;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static 
org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+
+@Category({NeedsOwnMiniClusterTest.class })

Review Comment:
   All the tests are great but would like to see one more test.
   1. Make ConnectionQueryServices#getAdmin() always throw IOException. In this 
case executeQuery will throw Exception and query will fail.



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCachingIT.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.ServerMetadataCache;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static 
org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+
+@Category({NeedsOwnMiniClusterTest.class })
+public class ServerMetadataCachingIT extends BaseTest {
+
+    private final Random RANDOM = new Random(42);
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(REGIONSERVER_COPROCESSOR_CONF_KEY,
+                PhoenixRegionServerEndpoint.class.getName());
+        props.put(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, 
Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    private void createTable(Connection conn, String tableName, long 
updateCacheFrequency) throws SQLException {
+        conn.createStatement().execute("CREATE TABLE " + tableName
+                + "(k INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2 INTEGER)"
+                + (updateCacheFrequency == 0 ? "" : 
"UPDATE_CACHE_FREQUENCY="+updateCacheFrequency));
+    }
+
+    private void upsert(Connection conn, String tableName) throws SQLException 
{
+        conn.createStatement().execute("UPSERT INTO " + tableName +
+                " (k, v1, v2) VALUES ("+  RANDOM.nextInt() +", " + 
RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+        conn.commit();
+    }
+
+    private void query(Connection conn, String tableName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) 
FROM " + tableName);
+        rs.next();
+    }
+
+    private void alterTableAddColumn(Connection conn, String tableName, String 
columnName) throws SQLException {
+        conn.createStatement().execute("ALTER TABLE " + tableName + " ADD IF 
NOT EXISTS "
+                + columnName + " INTEGER");
+    }
+
+    /**
+     * Client-1 creates a table, upserts data and alters the table.
+     * Client-2 queries the table before and after the alter.
+     * Check queries work successfully in both cases and verify number of 
getTable RPCs.
+     */
+    @Test
+    public void testSelectQueryWithOldDDLTimestamp() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+        int expectedNumGetTableRPCs;
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // create table with UCF=never and upsert data using client-1
+            createTable(conn1, tableName, Long.MAX_VALUE);

Review Comment:
   Instead of passing `Long.MAX_VALUE`, can we pass something like 
ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue("NEVER") to be more readable?



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCachingIT.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.ServerMetadataCache;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static 
org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+
+@Category({NeedsOwnMiniClusterTest.class })
+public class ServerMetadataCachingIT extends BaseTest {
+
+    private final Random RANDOM = new Random(42);
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(REGIONSERVER_COPROCESSOR_CONF_KEY,
+                PhoenixRegionServerEndpoint.class.getName());
+        props.put(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, 
Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    private void createTable(Connection conn, String tableName, long 
updateCacheFrequency) throws SQLException {
+        conn.createStatement().execute("CREATE TABLE " + tableName
+                + "(k INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2 INTEGER)"
+                + (updateCacheFrequency == 0 ? "" : 
"UPDATE_CACHE_FREQUENCY="+updateCacheFrequency));
+    }
+
+    private void upsert(Connection conn, String tableName) throws SQLException 
{
+        conn.createStatement().execute("UPSERT INTO " + tableName +
+                " (k, v1, v2) VALUES ("+  RANDOM.nextInt() +", " + 
RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+        conn.commit();
+    }
+
+    private void query(Connection conn, String tableName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) 
FROM " + tableName);
+        rs.next();
+    }
+
+    private void alterTableAddColumn(Connection conn, String tableName, String 
columnName) throws SQLException {
+        conn.createStatement().execute("ALTER TABLE " + tableName + " ADD IF 
NOT EXISTS "
+                + columnName + " INTEGER");
+    }
+
+    /**
+     * Client-1 creates a table, upserts data and alters the table.
+     * Client-2 queries the table before and after the alter.
+     * Check queries work successfully in both cases and verify number of 
getTable RPCs.
+     */
+    @Test
+    public void testSelectQueryWithOldDDLTimestamp() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+        int expectedNumGetTableRPCs;
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // create table with UCF=never and upsert data using client-1
+            createTable(conn1, tableName, Long.MAX_VALUE);
+            upsert(conn1, tableName);
+
+            // select query from client-2 works to populate client side 
metadata cache
+            // there should be 1 getTable RPC
+            query(conn2, tableName);
+            expectedNumGetTableRPCs = 1;
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumGetTableRPCs))
+                    .getTable((PName) isNull(),
+                            any(), eq(PVarchar.INSTANCE.toBytes(tableName)),
+                            anyLong(), anyLong());
+
+            // add column using client-1 to update last ddl timestamp
+            alterTableAddColumn(conn1, tableName, "newCol1");
+
+            // invalidate region server cache
+            ServerMetadataCache.resetCache();
+
+            // select query from client-2 with old ddl timestamp works
+            // there should be one more getTable RPC
+            query(conn2, tableName);
+            expectedNumGetTableRPCs += 1;
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumGetTableRPCs))

Review Comment:
   Is there any way we can test if this query encountered 
StaleMetadataCacheException and the cache was updated? 



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java:
##########
@@ -421,7 +514,20 @@ private PhoenixResultSet executeQuery(final 
CompilableStatement stmt,
                                     }
                                 }
                                 throw e;
-                            } catch (RuntimeException e) {
+                            } catch (SQLException e) {
+                                // force update cache if 
StaleMetadataCacheException and retry
+                                if (e instanceof StaleMetadataCacheException) {
+                                    String planSchemaName = 
getLastQueryPlan().getTableRef().getTable().getSchemaName().toString();
+                                    String planTableName = 
getLastQueryPlan().getTableRef().getTable().getTableName().toString();
+                                    // update cache
+                                    new 
MetaDataClient(connection).updateCache(connection.getTenantId(), 
planSchemaName, planTableName, true);

Review Comment:
   Good to add log line at debug level to indicate that we are updating cache 
with infoString.



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCachingIT.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.ServerMetadataCache;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static 
org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+
+@Category({NeedsOwnMiniClusterTest.class })
+public class ServerMetadataCachingIT extends BaseTest {
+
+    private final Random RANDOM = new Random(42);
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(REGIONSERVER_COPROCESSOR_CONF_KEY,
+                PhoenixRegionServerEndpoint.class.getName());
+        props.put(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, 
Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    private void createTable(Connection conn, String tableName, long 
updateCacheFrequency) throws SQLException {
+        conn.createStatement().execute("CREATE TABLE " + tableName
+                + "(k INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2 INTEGER)"
+                + (updateCacheFrequency == 0 ? "" : 
"UPDATE_CACHE_FREQUENCY="+updateCacheFrequency));
+    }
+
+    private void upsert(Connection conn, String tableName) throws SQLException 
{
+        conn.createStatement().execute("UPSERT INTO " + tableName +
+                " (k, v1, v2) VALUES ("+  RANDOM.nextInt() +", " + 
RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+        conn.commit();
+    }
+
+    private void query(Connection conn, String tableName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) 
FROM " + tableName);
+        rs.next();
+    }
+
+    private void alterTableAddColumn(Connection conn, String tableName, String 
columnName) throws SQLException {
+        conn.createStatement().execute("ALTER TABLE " + tableName + " ADD IF 
NOT EXISTS "
+                + columnName + " INTEGER");
+    }
+
+    /**
+     * Client-1 creates a table, upserts data and alters the table.
+     * Client-2 queries the table before and after the alter.
+     * Check queries work successfully in both cases and verify number of 
getTable RPCs.
+     */
+    @Test
+    public void testSelectQueryWithOldDDLTimestamp() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+        int expectedNumGetTableRPCs;
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // create table with UCF=never and upsert data using client-1
+            createTable(conn1, tableName, Long.MAX_VALUE);
+            upsert(conn1, tableName);
+
+            // select query from client-2 works to populate client side 
metadata cache
+            // there should be 1 getTable RPC
+            query(conn2, tableName);
+            expectedNumGetTableRPCs = 1;
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumGetTableRPCs))
+                    .getTable((PName) isNull(),
+                            any(), eq(PVarchar.INSTANCE.toBytes(tableName)),
+                            anyLong(), anyLong());
+
+            // add column using client-1 to update last ddl timestamp
+            alterTableAddColumn(conn1, tableName, "newCol1");
+
+            // invalidate region server cache
+            ServerMetadataCache.resetCache();
+
+            // select query from client-2 with old ddl timestamp works
+            // there should be one more getTable RPC
+            query(conn2, tableName);
+            expectedNumGetTableRPCs += 1;
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumGetTableRPCs))
+                    .getTable((PName) isNull(),
+                            any(), eq(PVarchar.INSTANCE.toBytes(tableName)),
+                            anyLong(), anyLong());
+
+            // select query from client-2 with latest ddl timestamp works
+            // there should be no more getTable RPCs
+            query(conn2, tableName);
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumGetTableRPCs))

Review Comment:
   Is there any way we can test if this query _did not_ encounter 
StaleMetadataCacheException?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to