[ 
https://issues.apache.org/jira/browse/PHOENIX-7251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17824191#comment-17824191
 ] 

ASF GitHub Bot commented on PHOENIX-7251:
-----------------------------------------

shahrs87 commented on code in PR #1845:
URL: https://github.com/apache/phoenix/pull/1845#discussion_r1515171337


##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCacheTestImpl.java:
##########
@@ -0,0 +1,66 @@
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.phoenix.cache.ServerMetadataCacheImpl;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Implementation of {@link ServerMetadataCache} for Integration Tests.
+ * Supports keeping more than one instance keyed on the regionserver 
ServerName.
+ */
+public class ServerMetadataCacheTestImpl extends ServerMetadataCacheImpl {
+    private static volatile Map<ServerName, ServerMetadataCacheTestImpl> 
INSTANCES = new HashMap<>();
+    private Connection connectionForTesting;
+    ServerMetadataCacheTestImpl(Configuration conf) {

Review Comment:
   new line before constructor.



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCacheTestImpl.java:
##########
@@ -0,0 +1,66 @@
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.phoenix.cache.ServerMetadataCacheImpl;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Implementation of {@link ServerMetadataCache} for Integration Tests.
+ * Supports keeping more than one instance keyed on the regionserver 
ServerName.
+ */
+public class ServerMetadataCacheTestImpl extends ServerMetadataCacheImpl {
+    private static volatile Map<ServerName, ServerMetadataCacheTestImpl> 
INSTANCES = new HashMap<>();
+    private Connection connectionForTesting;
+    ServerMetadataCacheTestImpl(Configuration conf) {
+        super(conf);
+    }
+
+    public static ServerMetadataCacheTestImpl getInstance(Configuration conf, 
ServerName serverName) {
+        ServerMetadataCacheTestImpl result = INSTANCES.get(serverName);
+        if (result == null) {
+            synchronized (ServerMetadataCacheTestImpl.class) {
+                result = INSTANCES.get(serverName);
+                if (result == null) {
+                    result = new ServerMetadataCacheTestImpl(conf);
+                    INSTANCES.put(serverName, result);
+                }
+            }
+        }
+        return result;
+    }
+
+    public static void setInstance(ServerName serverName, 
ServerMetadataCacheTestImpl cache) {
+        INSTANCES.put(serverName, cache);
+    }
+
+    public Long getLastDDLTimestampForTableFromCacheOnly(byte[] tenantID, 
byte[] schemaName,
+                                                         byte[] tableName) {
+        byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, 
tableName);
+        ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
+        return lastDDLTimestampMap.getIfPresent(tableKeyPtr);
+    }
+
+    public void setConnectionForTesting(Connection connection) {
+        this.connectionForTesting = connection;
+    }
+
+    public static void resetCache() {
+        INSTANCES.clear();
+    }
+
+    @Override
+    protected Connection getConnection(Properties properties) throws 
SQLException {
+        System.out.println("USED");
+        return connectionForTesting != null ? connectionForTesting
+                : QueryUtil.getConnectionOnServer(properties, this.conf);

Review Comment:
   ```suggestion
                   return connectionForTesting != null ? connectionForTesting : 
super.getConnection(properties);
   ```



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCacheTestImpl.java:
##########
@@ -0,0 +1,66 @@
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.phoenix.cache.ServerMetadataCacheImpl;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Implementation of {@link ServerMetadataCache} for Integration Tests.
+ * Supports keeping more than one instance keyed on the regionserver 
ServerName.
+ */
+public class ServerMetadataCacheTestImpl extends ServerMetadataCacheImpl {
+    private static volatile Map<ServerName, ServerMetadataCacheTestImpl> 
INSTANCES = new HashMap<>();
+    private Connection connectionForTesting;
+    ServerMetadataCacheTestImpl(Configuration conf) {
+        super(conf);
+    }
+
+    public static ServerMetadataCacheTestImpl getInstance(Configuration conf, 
ServerName serverName) {
+        ServerMetadataCacheTestImpl result = INSTANCES.get(serverName);
+        if (result == null) {
+            synchronized (ServerMetadataCacheTestImpl.class) {
+                result = INSTANCES.get(serverName);
+                if (result == null) {
+                    result = new ServerMetadataCacheTestImpl(conf);
+                    INSTANCES.put(serverName, result);
+                }
+            }
+        }
+        return result;
+    }
+
+    public static void setInstance(ServerName serverName, 
ServerMetadataCacheTestImpl cache) {
+        INSTANCES.put(serverName, cache);
+    }
+
+    public Long getLastDDLTimestampForTableFromCacheOnly(byte[] tenantID, 
byte[] schemaName,
+                                                         byte[] tableName) {
+        byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, 
tableName);
+        ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
+        return lastDDLTimestampMap.getIfPresent(tableKeyPtr);
+    }
+
+    public void setConnectionForTesting(Connection connection) {
+        this.connectionForTesting = connection;
+    }
+
+    public static void resetCache() {
+        INSTANCES.clear();
+    }
+
+    @Override
+    protected Connection getConnection(Properties properties) throws 
SQLException {
+        System.out.println("USED");

Review Comment:
   remove sysout comment.



##########
phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
+import 
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
+import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.phoenix.thirdparty.com.google.common.cache.RemovalListener;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+/**
+ * This manages the cache for all the objects(data table, views, indexes) on 
each region server.
+ * Currently, it only stores LAST_DDL_TIMESTAMP in the cache.
+ */
+public class ServerMetadataCacheImpl implements ServerMetadataCache {
+
+    protected Configuration conf;
+    // key is the combination of <tenantID, schema name, table name>, value is 
the lastDDLTimestamp
+    protected final Cache<ImmutableBytesPtr, Long> lastDDLTimestampMap;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerMetadataCacheImpl.class);
+    private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS =
+            "phoenix.coprocessor.regionserver.cache.ttl.ms";
+    // Keeping default cache expiry for 30 mins since we won't have stale entry
+    // for more than 30 mins.
+    private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS
+            = 30 * 60 * 1000L; // 30 mins
+    private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE
+            = "phoenix.coprocessor.regionserver.cache.size";
+    private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE = 
10000L;
+    private static volatile ServerMetadataCacheImpl INSTANCE;
+    private MetricsMetadataCachingSource metricsSource;
+
+    /**
+     * Creates/gets an instance of ServerMetadataCache.
+     *
+     * @param conf configuration
+     * @return cache
+     */
+    public static ServerMetadataCacheImpl getInstance(Configuration conf) {
+        ServerMetadataCacheImpl result = INSTANCE;
+        if (result == null) {
+            synchronized (ServerMetadataCacheImpl.class) {
+                result = INSTANCE;
+                if (result == null) {
+                    INSTANCE = result = new ServerMetadataCacheImpl(conf);
+                }
+            }
+        }
+        return result;
+    }
+
+    public ServerMetadataCacheImpl(Configuration conf) {
+        this.conf = conf;
+        this.metricsSource = MetricsPhoenixCoprocessorSourceFactory
+                                .getInstance().getMetadataCachingSource();
+        long maxTTL = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS,
+                DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS);
+        long maxSize = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE,
+                DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE);
+        lastDDLTimestampMap = CacheBuilder.newBuilder()
+                .removalListener((RemovalListener<ImmutableBytesPtr, Long>) 
notification -> {
+                    String key = notification.getKey().toString();
+                    LOGGER.debug("Expiring " + key + " because of "
+                            + notification.getCause().name());
+                })
+                // maximum number of entries this cache can handle.
+                .maximumSize(maxSize)
+                .expireAfterAccess(maxTTL, TimeUnit.MILLISECONDS)
+                .build();
+    }
+
+    /**
+     * Returns the last DDL timestamp from the table.
+     * If not found in cache, then query SYSCAT regionserver.
+     * @param tenantID tenant id
+     * @param schemaName schema name
+     * @param tableName table name
+     * @return last DDL timestamp
+     * @throws Exception
+     */
+    public long getLastDDLTimestampForTable(byte[] tenantID, byte[] 
schemaName, byte[] tableName)
+            throws SQLException {
+        String fullTableNameStr = SchemaUtil.getTableName(schemaName, 
tableName);
+        byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, 
tableName);
+        ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
+        // Lookup in cache if present.
+        Long lastDDLTimestamp = lastDDLTimestampMap.getIfPresent(tableKeyPtr);
+        if (lastDDLTimestamp != null) {
+            metricsSource.incrementRegionServerMetadataCacheHitCount();
+            LOGGER.trace("Retrieving last ddl timestamp value from cache for 
tableName: {}",
+                    fullTableNameStr);
+            return lastDDLTimestamp;
+        }
+        metricsSource.incrementRegionServerMetadataCacheMissCount();
+        PTable table;
+        String tenantIDStr = Bytes.toString(tenantID);
+        if (tenantIDStr == null || tenantIDStr.isEmpty()) {
+            tenantIDStr = null;
+        }
+        Properties properties = new Properties();
+        if (tenantIDStr != null) {
+            properties.setProperty(TENANT_ID_ATTRIB, tenantIDStr);
+        }
+        try (Connection connection = getConnection(properties)) {
+            // Using PhoenixRuntime#getTableNoCache since se don't want to 
read cached value.
+            table = PhoenixRuntime.getTableNoCache(connection, 
fullTableNameStr);
+            // TODO PhoenixRuntime#getTableNoCache can throw 
TableNotFoundException.
+            //  In that case, do we want to throw non retryable exception back 
to the client?
+            // Update cache with the latest DDL timestamp from SYSCAT server.
+            lastDDLTimestampMap.put(tableKeyPtr, table.getLastDDLTimestamp());
+        }
+        return table.getLastDDLTimestamp();
+    }
+
+    /**
+     * Invalidate cache for the given tenantID, schema name and table name.
+     * Guava cache is thread safe so we don't have to synchronize it 
explicitly.
+     * @param tenantID tenantID
+     * @param schemaName schemaName
+     * @param tableName tableName
+     */
+    public void invalidate(byte[] tenantID, byte[] schemaName, byte[] 
tableName) {
+        String fullTableNameStr = SchemaUtil.getTableName(schemaName, 
tableName);
+        LOGGER.debug("Invalidating server metadata cache for tenantID: {}, 
full table: {}",
+                Bytes.toString(tenantID), fullTableNameStr);
+        byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, 
tableName);
+        ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
+        lastDDLTimestampMap.invalidate(tableKeyPtr);
+    }
+
+    protected Connection getConnection(Properties properties) throws 
SQLException {
+        return QueryUtil.getConnectionOnServer(properties, this.conf);
+    }
+
+    protected static void resetCache() {

Review Comment:
   Do we need this method here? 



##########
phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheImplTest.java:
##########
@@ -121,7 +127,7 @@ public void testCacheForBaseTable() throws Exception {
             createTable(conn, tableNameStr, NEVER);
             pTable = PhoenixRuntime.getTableNoCache(conn,
                     tableNameStr);// --> First call to CQSI#getTable
-            ServerMetadataCache cache = 
ServerMetadataCache.getInstance(config);
+            ServerMetadataCacheTestImpl cache = 
ServerMetadataCacheTestImpl.getInstance(config, serverName);

Review Comment:
   Instead of directly calling ServerMetadataCacheTestImpl, we should call it 
via findCoprocessor method something like this:
   ```
   private ServerMetadataCache getCache() {
     String phoenixRegionServerEndpoint = 
conf.getValue(REGIONSERVER_COPROCESSOR_CONF_KEY);
     assertNotNull(phoenixRegionServerEndpoint);
     PhoenixRegionServerEndpoint coproc = 
regionServer.getRegionServerCoprocessorHost()               
     .findCoprocessor(phoenixRegionServerEndpoint.class); 
     assertNotNull(coproc);
     ServerMetadataCache cache = coproc.getServerMetadataCache();
     assertNotNull(cache);
     return cache;
   }
   ```
   



##########
phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheImplTest.java:
##########
@@ -76,11 +78,13 @@
 import static org.mockito.Mockito.verify;
 
 @Category(ParallelStatsDisabledIT.class)
-public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
+public class ServerMetadataCacheImplTest extends ParallelStatsDisabledIT {

Review Comment:
   Do we want to rename this class to `ServerMetadataCacheIT` to avoid 
confusion between `ServerMetadataCacheImplTest` and 
`ServerMetadataCacheTestImpl`





> Refactor server-side code to support multiple ServerMetadataCache for HA tests
> ------------------------------------------------------------------------------
>
>                 Key: PHOENIX-7251
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7251
>             Project: Phoenix
>          Issue Type: Sub-task
>            Reporter: Palash Chauhan
>            Assignee: Palash Chauhan
>            Priority: Major
>
> In the metadata caching re-design, `ServerMetadataCache` is required to be a 
> singleton in the implementation. This affects tests for the HA use case 
> because the coprocessors on the 2 clusters end up using the same 
> `ServerMetadataCache`. All tests which execute queries with 1 of the clusters 
> unavailable will fail. 
> We can refactor the implementation in the following way to support HA test 
> cases:
> 1. Create a `ServerMetadataCache` interface and use the current 
> implementation as `ServerMetadataCacheImpl` for all other tests. This would 
> be a singleton.
> 2. Implement `ServerMetadataCacheHAImpl` with a map of instances keyed on 
> config.
> 3. Extend `PhoenixRegionServerEndpoint` and use `ServerMetadataCacheHAImpl`. 
> 4. In HA tests, load this new endpoint on the region servers. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to