shahrs87 commented on code in PR #1691:
URL: https://github.com/apache/phoenix/pull/1691#discussion_r1437919100


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java:
##########
@@ -3416,6 +3437,167 @@ private MetaDataMutationResult mutateColumn(
         }
     }
 
+    /**
+     * Invalidate metadata cache from all region servers for the given tenant 
and table name.
+     * @param tenantId
+     * @param schemaName
+     * @param tableOrViewName
+     * @throws Throwable
+     */
+    private void invalidateServerMetadataCache(byte[] tenantId, 
byte[]schemaName,
+            byte[] tableOrViewName) throws Throwable {
+        Configuration conf = env.getConfiguration();
+        String value = conf.get(REGIONSERVER_COPROCESSOR_CONF_KEY);
+        if (value == null
+                || 
!value.contains(PhoenixRegionServerEndpoint.class.getName())) {
+            // PhoenixRegionServerEndpoint is not loaded. We don't have to 
invalidate the cache.
+            LOGGER.info("Skip invalidating server metadata cache for tenantID: 
{},"
+                            + " schema name: {}, table Name: {} since 
PhoenixRegionServerEndpoint"
+                            + " is not loaded", Bytes.toString(tenantId),
+                    Bytes.toString(schemaName), 
Bytes.toString(tableOrViewName));
+            return;
+        }
+        Properties properties = new Properties();
+        // Skip checking of system table existence since the system tables 
should have created
+        // by now.
+        properties.setProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK, "true");
+        try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(properties,
+                env.getConfiguration()).unwrap(PhoenixConnection.class);
+             Admin admin = connection.getQueryServices().getAdmin()) {
+            // This will incur an extra RPC to the master. This RPC is 
required since we want to
+            // get current list of regionservers.
+            Collection<ServerName> serverNames = admin.getRegionServers(true);
+            invalidateServerMetadataCacheWithRetries(admin, serverNames, 
tenantId, schemaName,
+                    tableOrViewName, false);
+        }
+    }
+
+    /**
+     * Invalidate metadata cache on all regionservers with retries for the 
given tenantID
+     * and tableName with retries. We retry once before failing the operation.
+     *
+     * @param admin
+     * @param serverNames
+     * @param tenantId
+     * @param schemaName
+     * @param tableOrViewName
+     * @param isRetry
+     * @throws Throwable
+     */
+    private void invalidateServerMetadataCacheWithRetries(Admin admin,
+            Collection<ServerName> serverNames, byte[] tenantId, byte[] 
schemaName,
+            byte[] tableOrViewName, boolean isRetry) throws Throwable {
+        String fullTableName = SchemaUtil.getTableName(schemaName, 
tableOrViewName);
+        String tenantIDStr = Bytes.toString(tenantId);
+        LOGGER.info("Invalidating metadata cache for tenantID: {}, tableName: 
{} for"
+                        + " region servers: {}, isRetry: {}", tenantIDStr, 
fullTableName,
+                serverNames, isRetry);
+        RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest 
request =
+                getRequest(tenantId, schemaName, tableOrViewName);
+        // TODO Do I need my own executor or can I re-use 
QueryServices#Executor
+        //  since it is supposed to be used only for scans according to 
documentation?
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        Map<Future, ServerName> map = new HashMap<>();
+        for (ServerName serverName : serverNames) {
+            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                try {
+                    PhoenixStopWatch innerWatch = new 
PhoenixStopWatch().start();
+                    // TODO Using the same as ServerCacheClient but need to 
think if we need some
+                    // special controller for invalidating cache since this is 
in the path of
+                    // DDL operations. We also need to think of we need 
separate RPC handler
+                    // threads for this?
+                    ServerRpcController controller = new ServerRpcController();
+                    
RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface
+                            service = 
RegionServerEndpointProtos.RegionServerEndpointService
+                            
.newBlockingStub(admin.coprocessorService(serverName));
+                    LOGGER.info("Sending invalidate metadata cache for 
tenantID: {}, tableName: {}"
+                            + " to region server: {}", tenantIDStr, 
fullTableName, serverName);
+                    // The timeout for this particular request is managed by 
config parameter:
+                    // hbase.rpc.timeout. Even if the future times out, this 
runnable can be in
+                    // RUNNING state and will not be interrupted.
+                    service.invalidateServerMetadataCache(controller, request);
+                    LOGGER.info("Invalidating metadata cache for tenantID: {}, 
tableName: {}"
+                            + " on region server: {} completed successfully 
and it took {} ms",
+                            tenantIDStr, fullTableName, serverName,
+                            innerWatch.stop().elapsedMillis());
+                    // TODO Create a histogram metric for time taken for 
invalidating the cache.
+                } catch (ServiceException se) {
+                    LOGGER.error("Invalidating metadata cache for tenantID: 
{}, tableName: {}"
+                                    + " failed for regionserver {}", 
tenantIDStr, fullTableName,
+                            serverName, se);
+                    IOException ioe = ServerUtil.parseServiceException(se);
+                    throw new CompletionException(ioe);
+                }
+            });
+            futures.add(future);
+            map.put(future, serverName);
+        }
+
+        // Here we create one master like future which tracks individual future
+        // for each region server.
+        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
+                futures.toArray(new CompletableFuture[0]));
+        try {
+            allFutures.get(metadataCacheInvalidationTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (Throwable t) {
+            List<ServerName> failedServers = getFailedServers(futures, map);
+            LOGGER.error("Invalidating metadata cache for tenantID: {}, 
tableName: {} failed for "

Review Comment:
   I can change it to WARN. WDYT? @haridsv 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to