shahrs87 commented on code in PR #1691: URL: https://github.com/apache/phoenix/pull/1691#discussion_r1437918859
########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java: ########## @@ -3416,6 +3437,167 @@ private MetaDataMutationResult mutateColumn( } } + /** + * Invalidate metadata cache from all region servers for the given tenant and table name. + * @param tenantId + * @param schemaName + * @param tableOrViewName + * @throws Throwable + */ + private void invalidateServerMetadataCache(byte[] tenantId, byte[]schemaName, + byte[] tableOrViewName) throws Throwable { + Configuration conf = env.getConfiguration(); + String value = conf.get(REGIONSERVER_COPROCESSOR_CONF_KEY); + if (value == null + || !value.contains(PhoenixRegionServerEndpoint.class.getName())) { + // PhoenixRegionServerEndpoint is not loaded. We don't have to invalidate the cache. + LOGGER.info("Skip invalidating server metadata cache for tenantID: {}," + + " schema name: {}, table Name: {} since PhoenixRegionServerEndpoint" + + " is not loaded", Bytes.toString(tenantId), + Bytes.toString(schemaName), Bytes.toString(tableOrViewName)); + return; + } + Properties properties = new Properties(); + // Skip checking of system table existence since the system tables should have created + // by now. + properties.setProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK, "true"); + try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(properties, + env.getConfiguration()).unwrap(PhoenixConnection.class); + Admin admin = connection.getQueryServices().getAdmin()) { + // This will incur an extra RPC to the master. This RPC is required since we want to + // get current list of regionservers. + Collection<ServerName> serverNames = admin.getRegionServers(true); + invalidateServerMetadataCacheWithRetries(admin, serverNames, tenantId, schemaName, + tableOrViewName, false); + } + } + + /** + * Invalidate metadata cache on all regionservers with retries for the given tenantID + * and tableName with retries. We retry once before failing the operation. + * + * @param admin + * @param serverNames + * @param tenantId + * @param schemaName + * @param tableOrViewName + * @param isRetry + * @throws Throwable + */ + private void invalidateServerMetadataCacheWithRetries(Admin admin, + Collection<ServerName> serverNames, byte[] tenantId, byte[] schemaName, + byte[] tableOrViewName, boolean isRetry) throws Throwable { + String fullTableName = SchemaUtil.getTableName(schemaName, tableOrViewName); + String tenantIDStr = Bytes.toString(tenantId); + LOGGER.info("Invalidating metadata cache for tenantID: {}, tableName: {} for" + + " region servers: {}, isRetry: {}", tenantIDStr, fullTableName, + serverNames, isRetry); + RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest request = + getRequest(tenantId, schemaName, tableOrViewName); + // TODO Do I need my own executor or can I re-use QueryServices#Executor + // since it is supposed to be used only for scans according to documentation? + List<CompletableFuture<Void>> futures = new ArrayList<>(); + Map<Future, ServerName> map = new HashMap<>(); + for (ServerName serverName : serverNames) { + CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { + try { + PhoenixStopWatch innerWatch = new PhoenixStopWatch().start(); + // TODO Using the same as ServerCacheClient but need to think if we need some + // special controller for invalidating cache since this is in the path of + // DDL operations. We also need to think of we need separate RPC handler + // threads for this? + ServerRpcController controller = new ServerRpcController(); + RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface + service = RegionServerEndpointProtos.RegionServerEndpointService + .newBlockingStub(admin.coprocessorService(serverName)); + LOGGER.info("Sending invalidate metadata cache for tenantID: {}, tableName: {}" + + " to region server: {}", tenantIDStr, fullTableName, serverName); + // The timeout for this particular request is managed by config parameter: + // hbase.rpc.timeout. Even if the future times out, this runnable can be in + // RUNNING state and will not be interrupted. + service.invalidateServerMetadataCache(controller, request); + LOGGER.info("Invalidating metadata cache for tenantID: {}, tableName: {}" + + " on region server: {} completed successfully and it took {} ms", + tenantIDStr, fullTableName, serverName, + innerWatch.stop().elapsedMillis()); + // TODO Create a histogram metric for time taken for invalidating the cache. + } catch (ServiceException se) { + LOGGER.error("Invalidating metadata cache for tenantID: {}, tableName: {}" + + " failed for regionserver {}", tenantIDStr, fullTableName, + serverName, se); + IOException ioe = ServerUtil.parseServiceException(se); + throw new CompletionException(ioe); + } + }); + futures.add(future); + map.put(future, serverName); + } + + // Here we create one master like future which tracks individual future + // for each region server. + CompletableFuture<Void> allFutures = CompletableFuture.allOf( + futures.toArray(new CompletableFuture[0])); + try { + allFutures.get(metadataCacheInvalidationTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Throwable t) { Review Comment: In case of non declared exception, what do you think we should do? Should we just fail the DDL operation and not retry? I was thinking to catch all the exception, retry once and if it still fails then fail the operation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org