[ https://issues.apache.org/jira/browse/PHOENIX-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801069#comment-17801069 ]
ASF GitHub Bot commented on PHOENIX-6968: ----------------------------------------- shahrs87 commented on code in PR #1691: URL: https://github.com/apache/phoenix/pull/1691#discussion_r1437919100 ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java: ########## @@ -3416,6 +3437,167 @@ private MetaDataMutationResult mutateColumn( } } + /** + * Invalidate metadata cache from all region servers for the given tenant and table name. + * @param tenantId + * @param schemaName + * @param tableOrViewName + * @throws Throwable + */ + private void invalidateServerMetadataCache(byte[] tenantId, byte[]schemaName, + byte[] tableOrViewName) throws Throwable { + Configuration conf = env.getConfiguration(); + String value = conf.get(REGIONSERVER_COPROCESSOR_CONF_KEY); + if (value == null + || !value.contains(PhoenixRegionServerEndpoint.class.getName())) { + // PhoenixRegionServerEndpoint is not loaded. We don't have to invalidate the cache. + LOGGER.info("Skip invalidating server metadata cache for tenantID: {}," + + " schema name: {}, table Name: {} since PhoenixRegionServerEndpoint" + + " is not loaded", Bytes.toString(tenantId), + Bytes.toString(schemaName), Bytes.toString(tableOrViewName)); + return; + } + Properties properties = new Properties(); + // Skip checking of system table existence since the system tables should have created + // by now. + properties.setProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK, "true"); + try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(properties, + env.getConfiguration()).unwrap(PhoenixConnection.class); + Admin admin = connection.getQueryServices().getAdmin()) { + // This will incur an extra RPC to the master. This RPC is required since we want to + // get current list of regionservers. + Collection<ServerName> serverNames = admin.getRegionServers(true); + invalidateServerMetadataCacheWithRetries(admin, serverNames, tenantId, schemaName, + tableOrViewName, false); + } + } + + /** + * Invalidate metadata cache on all regionservers with retries for the given tenantID + * and tableName with retries. We retry once before failing the operation. + * + * @param admin + * @param serverNames + * @param tenantId + * @param schemaName + * @param tableOrViewName + * @param isRetry + * @throws Throwable + */ + private void invalidateServerMetadataCacheWithRetries(Admin admin, + Collection<ServerName> serverNames, byte[] tenantId, byte[] schemaName, + byte[] tableOrViewName, boolean isRetry) throws Throwable { + String fullTableName = SchemaUtil.getTableName(schemaName, tableOrViewName); + String tenantIDStr = Bytes.toString(tenantId); + LOGGER.info("Invalidating metadata cache for tenantID: {}, tableName: {} for" + + " region servers: {}, isRetry: {}", tenantIDStr, fullTableName, + serverNames, isRetry); + RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest request = + getRequest(tenantId, schemaName, tableOrViewName); + // TODO Do I need my own executor or can I re-use QueryServices#Executor + // since it is supposed to be used only for scans according to documentation? + List<CompletableFuture<Void>> futures = new ArrayList<>(); + Map<Future, ServerName> map = new HashMap<>(); + for (ServerName serverName : serverNames) { + CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { + try { + PhoenixStopWatch innerWatch = new PhoenixStopWatch().start(); + // TODO Using the same as ServerCacheClient but need to think if we need some + // special controller for invalidating cache since this is in the path of + // DDL operations. We also need to think of we need separate RPC handler + // threads for this? + ServerRpcController controller = new ServerRpcController(); + RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface + service = RegionServerEndpointProtos.RegionServerEndpointService + .newBlockingStub(admin.coprocessorService(serverName)); + LOGGER.info("Sending invalidate metadata cache for tenantID: {}, tableName: {}" + + " to region server: {}", tenantIDStr, fullTableName, serverName); + // The timeout for this particular request is managed by config parameter: + // hbase.rpc.timeout. Even if the future times out, this runnable can be in + // RUNNING state and will not be interrupted. + service.invalidateServerMetadataCache(controller, request); + LOGGER.info("Invalidating metadata cache for tenantID: {}, tableName: {}" + + " on region server: {} completed successfully and it took {} ms", + tenantIDStr, fullTableName, serverName, + innerWatch.stop().elapsedMillis()); + // TODO Create a histogram metric for time taken for invalidating the cache. + } catch (ServiceException se) { + LOGGER.error("Invalidating metadata cache for tenantID: {}, tableName: {}" + + " failed for regionserver {}", tenantIDStr, fullTableName, + serverName, se); + IOException ioe = ServerUtil.parseServiceException(se); + throw new CompletionException(ioe); + } + }); + futures.add(future); + map.put(future, serverName); + } + + // Here we create one master like future which tracks individual future + // for each region server. + CompletableFuture<Void> allFutures = CompletableFuture.allOf( + futures.toArray(new CompletableFuture[0])); + try { + allFutures.get(metadataCacheInvalidationTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + List<ServerName> failedServers = getFailedServers(futures, map); + LOGGER.error("Invalidating metadata cache for tenantID: {}, tableName: {} failed for " Review Comment: I can change it to WARN. WDYT? @haridsv > Create PhoenixRegionServerEndpoint#invalidateCache method to invalidate cache. > ------------------------------------------------------------------------------ > > Key: PHOENIX-6968 > URL: https://issues.apache.org/jira/browse/PHOENIX-6968 > Project: Phoenix > Issue Type: Sub-task > Reporter: Rushabh Shah > Assignee: Rushabh Shah > Priority: Major > > Whenever we update metadata (like alter table add column, drop table), we > need to invalidate metadata cache entry (introduced by PHOENIX-6943) on all > the regionservers which has that cache entry. First step would be to issue an > invalidate command on all the regionservers irrespective of whether that > regionserver has the cache entry. We can further optimize by invalidating > only on RS that has that cache entry. > In PHOENIX-6988 we created PhoenixRegionServerEndpoint implementing > RegionServerCoprocessor. We can create a new method in this co-proc something > like invalidateCache(CacheEntry) to invalidate cache for a given > table/view/index. -- This message was sent by Atlassian Jira (v8.20.10#820010)