[ 
https://issues.apache.org/jira/browse/PHOENIX-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801068#comment-17801068
 ] 

ASF GitHub Bot commented on PHOENIX-6968:
-----------------------------------------

shahrs87 commented on code in PR #1691:
URL: https://github.com/apache/phoenix/pull/1691#discussion_r1437918859


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java:
##########
@@ -3416,6 +3437,167 @@ private MetaDataMutationResult mutateColumn(
         }
     }
 
+    /**
+     * Invalidate metadata cache from all region servers for the given tenant 
and table name.
+     * @param tenantId
+     * @param schemaName
+     * @param tableOrViewName
+     * @throws Throwable
+     */
+    private void invalidateServerMetadataCache(byte[] tenantId, 
byte[]schemaName,
+            byte[] tableOrViewName) throws Throwable {
+        Configuration conf = env.getConfiguration();
+        String value = conf.get(REGIONSERVER_COPROCESSOR_CONF_KEY);
+        if (value == null
+                || 
!value.contains(PhoenixRegionServerEndpoint.class.getName())) {
+            // PhoenixRegionServerEndpoint is not loaded. We don't have to 
invalidate the cache.
+            LOGGER.info("Skip invalidating server metadata cache for tenantID: 
{},"
+                            + " schema name: {}, table Name: {} since 
PhoenixRegionServerEndpoint"
+                            + " is not loaded", Bytes.toString(tenantId),
+                    Bytes.toString(schemaName), 
Bytes.toString(tableOrViewName));
+            return;
+        }
+        Properties properties = new Properties();
+        // Skip checking of system table existence since the system tables 
should have created
+        // by now.
+        properties.setProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK, "true");
+        try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(properties,
+                env.getConfiguration()).unwrap(PhoenixConnection.class);
+             Admin admin = connection.getQueryServices().getAdmin()) {
+            // This will incur an extra RPC to the master. This RPC is 
required since we want to
+            // get current list of regionservers.
+            Collection<ServerName> serverNames = admin.getRegionServers(true);
+            invalidateServerMetadataCacheWithRetries(admin, serverNames, 
tenantId, schemaName,
+                    tableOrViewName, false);
+        }
+    }
+
+    /**
+     * Invalidate metadata cache on all regionservers with retries for the 
given tenantID
+     * and tableName with retries. We retry once before failing the operation.
+     *
+     * @param admin
+     * @param serverNames
+     * @param tenantId
+     * @param schemaName
+     * @param tableOrViewName
+     * @param isRetry
+     * @throws Throwable
+     */
+    private void invalidateServerMetadataCacheWithRetries(Admin admin,
+            Collection<ServerName> serverNames, byte[] tenantId, byte[] 
schemaName,
+            byte[] tableOrViewName, boolean isRetry) throws Throwable {
+        String fullTableName = SchemaUtil.getTableName(schemaName, 
tableOrViewName);
+        String tenantIDStr = Bytes.toString(tenantId);
+        LOGGER.info("Invalidating metadata cache for tenantID: {}, tableName: 
{} for"
+                        + " region servers: {}, isRetry: {}", tenantIDStr, 
fullTableName,
+                serverNames, isRetry);
+        RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest 
request =
+                getRequest(tenantId, schemaName, tableOrViewName);
+        // TODO Do I need my own executor or can I re-use 
QueryServices#Executor
+        //  since it is supposed to be used only for scans according to 
documentation?
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        Map<Future, ServerName> map = new HashMap<>();
+        for (ServerName serverName : serverNames) {
+            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                try {
+                    PhoenixStopWatch innerWatch = new 
PhoenixStopWatch().start();
+                    // TODO Using the same as ServerCacheClient but need to 
think if we need some
+                    // special controller for invalidating cache since this is 
in the path of
+                    // DDL operations. We also need to think of we need 
separate RPC handler
+                    // threads for this?
+                    ServerRpcController controller = new ServerRpcController();
+                    
RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface
+                            service = 
RegionServerEndpointProtos.RegionServerEndpointService
+                            
.newBlockingStub(admin.coprocessorService(serverName));
+                    LOGGER.info("Sending invalidate metadata cache for 
tenantID: {}, tableName: {}"
+                            + " to region server: {}", tenantIDStr, 
fullTableName, serverName);
+                    // The timeout for this particular request is managed by 
config parameter:
+                    // hbase.rpc.timeout. Even if the future times out, this 
runnable can be in
+                    // RUNNING state and will not be interrupted.
+                    service.invalidateServerMetadataCache(controller, request);
+                    LOGGER.info("Invalidating metadata cache for tenantID: {}, 
tableName: {}"
+                            + " on region server: {} completed successfully 
and it took {} ms",
+                            tenantIDStr, fullTableName, serverName,
+                            innerWatch.stop().elapsedMillis());
+                    // TODO Create a histogram metric for time taken for 
invalidating the cache.
+                } catch (ServiceException se) {
+                    LOGGER.error("Invalidating metadata cache for tenantID: 
{}, tableName: {}"
+                                    + " failed for regionserver {}", 
tenantIDStr, fullTableName,
+                            serverName, se);
+                    IOException ioe = ServerUtil.parseServiceException(se);
+                    throw new CompletionException(ioe);
+                }
+            });
+            futures.add(future);
+            map.put(future, serverName);
+        }
+
+        // Here we create one master like future which tracks individual future
+        // for each region server.
+        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
+                futures.toArray(new CompletableFuture[0]));
+        try {
+            allFutures.get(metadataCacheInvalidationTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (Throwable t) {

Review Comment:
   In case of non declared exception, what do you think we should do? Should we 
just fail the DDL operation and not retry? 
   I was thinking to catch all the exception, retry once and if it still fails 
then fail the operation.





> Create PhoenixRegionServerEndpoint#invalidateCache method to invalidate cache.
> ------------------------------------------------------------------------------
>
>                 Key: PHOENIX-6968
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6968
>             Project: Phoenix
>          Issue Type: Sub-task
>            Reporter: Rushabh Shah
>            Assignee: Rushabh Shah
>            Priority: Major
>
> Whenever we update metadata (like alter table add column, drop table), we 
> need to invalidate metadata cache entry (introduced by PHOENIX-6943) on all 
> the regionservers which has that cache entry. First step would be to issue an 
> invalidate command on all the regionservers irrespective of whether that 
> regionserver has the cache entry. We can further optimize by invalidating 
> only on RS that has that cache entry.
> In PHOENIX-6988 we created PhoenixRegionServerEndpoint implementing 
> RegionServerCoprocessor. We can create a new method in this co-proc something 
> like invalidateCache(CacheEntry) to invalidate cache for a given 
> table/view/index.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to