[ 
https://issues.apache.org/jira/browse/PHOENIX-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801070#comment-17801070
 ] 

ASF GitHub Bot commented on PHOENIX-6968:
-----------------------------------------

shahrs87 commented on code in PR #1691:
URL: https://github.com/apache/phoenix/pull/1691#discussion_r1437921306


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java:
##########
@@ -3416,6 +3437,167 @@ private MetaDataMutationResult mutateColumn(
         }
     }
 
+    /**
+     * Invalidate metadata cache from all region servers for the given tenant 
and table name.
+     * @param tenantId
+     * @param schemaName
+     * @param tableOrViewName
+     * @throws Throwable
+     */
+    private void invalidateServerMetadataCache(byte[] tenantId, 
byte[]schemaName,
+            byte[] tableOrViewName) throws Throwable {
+        Configuration conf = env.getConfiguration();
+        String value = conf.get(REGIONSERVER_COPROCESSOR_CONF_KEY);
+        if (value == null
+                || 
!value.contains(PhoenixRegionServerEndpoint.class.getName())) {
+            // PhoenixRegionServerEndpoint is not loaded. We don't have to 
invalidate the cache.
+            LOGGER.info("Skip invalidating server metadata cache for tenantID: 
{},"
+                            + " schema name: {}, table Name: {} since 
PhoenixRegionServerEndpoint"
+                            + " is not loaded", Bytes.toString(tenantId),
+                    Bytes.toString(schemaName), 
Bytes.toString(tableOrViewName));
+            return;
+        }
+        Properties properties = new Properties();
+        // Skip checking of system table existence since the system tables 
should have created
+        // by now.
+        properties.setProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK, "true");
+        try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(properties,
+                env.getConfiguration()).unwrap(PhoenixConnection.class);
+             Admin admin = connection.getQueryServices().getAdmin()) {
+            // This will incur an extra RPC to the master. This RPC is 
required since we want to
+            // get current list of regionservers.
+            Collection<ServerName> serverNames = admin.getRegionServers(true);
+            invalidateServerMetadataCacheWithRetries(admin, serverNames, 
tenantId, schemaName,
+                    tableOrViewName, false);
+        }
+    }
+
+    /**
+     * Invalidate metadata cache on all regionservers with retries for the 
given tenantID
+     * and tableName with retries. We retry once before failing the operation.
+     *
+     * @param admin
+     * @param serverNames
+     * @param tenantId
+     * @param schemaName
+     * @param tableOrViewName
+     * @param isRetry
+     * @throws Throwable
+     */
+    private void invalidateServerMetadataCacheWithRetries(Admin admin,
+            Collection<ServerName> serverNames, byte[] tenantId, byte[] 
schemaName,
+            byte[] tableOrViewName, boolean isRetry) throws Throwable {
+        String fullTableName = SchemaUtil.getTableName(schemaName, 
tableOrViewName);
+        String tenantIDStr = Bytes.toString(tenantId);
+        LOGGER.info("Invalidating metadata cache for tenantID: {}, tableName: 
{} for"
+                        + " region servers: {}, isRetry: {}", tenantIDStr, 
fullTableName,
+                serverNames, isRetry);
+        RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest 
request =
+                getRequest(tenantId, schemaName, tableOrViewName);
+        // TODO Do I need my own executor or can I re-use 
QueryServices#Executor
+        //  since it is supposed to be used only for scans according to 
documentation?
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        Map<Future, ServerName> map = new HashMap<>();
+        for (ServerName serverName : serverNames) {
+            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                try {
+                    PhoenixStopWatch innerWatch = new 
PhoenixStopWatch().start();
+                    // TODO Using the same as ServerCacheClient but need to 
think if we need some
+                    // special controller for invalidating cache since this is 
in the path of
+                    // DDL operations. We also need to think of we need 
separate RPC handler
+                    // threads for this?
+                    ServerRpcController controller = new ServerRpcController();
+                    
RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface
+                            service = 
RegionServerEndpointProtos.RegionServerEndpointService
+                            
.newBlockingStub(admin.coprocessorService(serverName));
+                    LOGGER.info("Sending invalidate metadata cache for 
tenantID: {}, tableName: {}"
+                            + " to region server: {}", tenantIDStr, 
fullTableName, serverName);
+                    // The timeout for this particular request is managed by 
config parameter:
+                    // hbase.rpc.timeout. Even if the future times out, this 
runnable can be in
+                    // RUNNING state and will not be interrupted.
+                    service.invalidateServerMetadataCache(controller, request);
+                    LOGGER.info("Invalidating metadata cache for tenantID: {}, 
tableName: {}"
+                            + " on region server: {} completed successfully 
and it took {} ms",
+                            tenantIDStr, fullTableName, serverName,
+                            innerWatch.stop().elapsedMillis());
+                    // TODO Create a histogram metric for time taken for 
invalidating the cache.
+                } catch (ServiceException se) {
+                    LOGGER.error("Invalidating metadata cache for tenantID: 
{}, tableName: {}"
+                                    + " failed for regionserver {}", 
tenantIDStr, fullTableName,
+                            serverName, se);
+                    IOException ioe = ServerUtil.parseServiceException(se);
+                    throw new CompletionException(ioe);
+                }
+            });
+            futures.add(future);
+            map.put(future, serverName);
+        }
+
+        // Here we create one master like future which tracks individual future
+        // for each region server.
+        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
+                futures.toArray(new CompletableFuture[0]));
+        try {
+            allFutures.get(metadataCacheInvalidationTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (Throwable t) {
+            List<ServerName> failedServers = getFailedServers(futures, map);
+            LOGGER.error("Invalidating metadata cache for tenantID: {}, 
tableName: {} failed for "
+                    + "region servers: {}", tenantIDStr, fullTableName, 
failedServers, t);
+            if (isRetry) {
+                // If this is a retry attempt then just fail the operation.
+                if (allFutures.isCompletedExceptionally()) {
+                    if (t instanceof ExecutionException) {
+                        t = t.getCause();
+                    }
+                }
+                throw t;
+            } else {
+                // This is the first attempt, we can retry once.
+                // Indicate that this is a retry attempt.
+                invalidateServerMetadataCacheWithRetries(admin, failedServers,
+                        tenantId, schemaName, tableOrViewName, true);
+            }
+        }
+    }
+
+    /*
+        Get the list of regionservers that failed the invalidateCache rpc.
+     */
+    private List<ServerName> getFailedServers(List<CompletableFuture<Void>> 
futures,
+                                              Map<Future, ServerName> map) {
+        List<ServerName> failedServers = new ArrayList<>();
+        for (CompletableFuture completedFuture : futures) {
+            if (completedFuture.isDone() == false) {

Review Comment:
   > Will it have a false value?
   
   Yes, it will have false value. We also have [test 
case](https://github.com/apache/phoenix/blob/PHOENIX-6883-feature/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java#L67)
 for this.
   
   > In the timeout case, won't the remote RPC thread be in a stuck state? What 
is the chance of a retry making progress, besides initiating a concurrent 
request?
   
   In a later jira 
[PHOENIX-7115](https://issues.apache.org/jira/browse/PHOENIX-7115), we have 
added special handler threads on each regionserver just to handle invalidate 
cache RPCs. Earlier invalidate cache RPCs were handled by default handler 
threads. So the probability of these RPC threads being stuck is very small but 
if it happens we can increase the number of threads if necessary.
    





> Create PhoenixRegionServerEndpoint#invalidateCache method to invalidate cache.
> ------------------------------------------------------------------------------
>
>                 Key: PHOENIX-6968
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6968
>             Project: Phoenix
>          Issue Type: Sub-task
>            Reporter: Rushabh Shah
>            Assignee: Rushabh Shah
>            Priority: Major
>
> Whenever we update metadata (like alter table add column, drop table), we 
> need to invalidate metadata cache entry (introduced by PHOENIX-6943) on all 
> the regionservers which has that cache entry. First step would be to issue an 
> invalidate command on all the regionservers irrespective of whether that 
> regionserver has the cache entry. We can further optimize by invalidating 
> only on RS that has that cache entry.
> In PHOENIX-6988 we created PhoenixRegionServerEndpoint implementing 
> RegionServerCoprocessor. We can create a new method in this co-proc something 
> like invalidateCache(CacheEntry) to invalidate cache for a given 
> table/view/index.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to