[
https://issues.apache.org/jira/browse/PHOENIX-7251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17824191#comment-17824191
]
ASF GitHub Bot commented on PHOENIX-7251:
-----------------------------------------
shahrs87 commented on code in PR #1845:
URL: https://github.com/apache/phoenix/pull/1845#discussion_r1515171337
##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCacheTestImpl.java:
##########
@@ -0,0 +1,66 @@
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.phoenix.cache.ServerMetadataCacheImpl;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Implementation of {@link ServerMetadataCache} for Integration Tests.
+ * Supports keeping more than one instance keyed on the regionserver
ServerName.
+ */
+public class ServerMetadataCacheTestImpl extends ServerMetadataCacheImpl {
+ private static volatile Map<ServerName, ServerMetadataCacheTestImpl>
INSTANCES = new HashMap<>();
+ private Connection connectionForTesting;
+ ServerMetadataCacheTestImpl(Configuration conf) {
Review Comment:
new line before constructor.
##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCacheTestImpl.java:
##########
@@ -0,0 +1,66 @@
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.phoenix.cache.ServerMetadataCacheImpl;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Implementation of {@link ServerMetadataCache} for Integration Tests.
+ * Supports keeping more than one instance keyed on the regionserver
ServerName.
+ */
+public class ServerMetadataCacheTestImpl extends ServerMetadataCacheImpl {
+ private static volatile Map<ServerName, ServerMetadataCacheTestImpl>
INSTANCES = new HashMap<>();
+ private Connection connectionForTesting;
+ ServerMetadataCacheTestImpl(Configuration conf) {
+ super(conf);
+ }
+
+ public static ServerMetadataCacheTestImpl getInstance(Configuration conf,
ServerName serverName) {
+ ServerMetadataCacheTestImpl result = INSTANCES.get(serverName);
+ if (result == null) {
+ synchronized (ServerMetadataCacheTestImpl.class) {
+ result = INSTANCES.get(serverName);
+ if (result == null) {
+ result = new ServerMetadataCacheTestImpl(conf);
+ INSTANCES.put(serverName, result);
+ }
+ }
+ }
+ return result;
+ }
+
+ public static void setInstance(ServerName serverName,
ServerMetadataCacheTestImpl cache) {
+ INSTANCES.put(serverName, cache);
+ }
+
+ public Long getLastDDLTimestampForTableFromCacheOnly(byte[] tenantID,
byte[] schemaName,
+ byte[] tableName) {
+ byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName,
tableName);
+ ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
+ return lastDDLTimestampMap.getIfPresent(tableKeyPtr);
+ }
+
+ public void setConnectionForTesting(Connection connection) {
+ this.connectionForTesting = connection;
+ }
+
+ public static void resetCache() {
+ INSTANCES.clear();
+ }
+
+ @Override
+ protected Connection getConnection(Properties properties) throws
SQLException {
+ System.out.println("USED");
+ return connectionForTesting != null ? connectionForTesting
+ : QueryUtil.getConnectionOnServer(properties, this.conf);
Review Comment:
```suggestion
return connectionForTesting != null ? connectionForTesting :
super.getConnection(properties);
```
##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerMetadataCacheTestImpl.java:
##########
@@ -0,0 +1,66 @@
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.phoenix.cache.ServerMetadataCacheImpl;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Implementation of {@link ServerMetadataCache} for Integration Tests.
+ * Supports keeping more than one instance keyed on the regionserver
ServerName.
+ */
+public class ServerMetadataCacheTestImpl extends ServerMetadataCacheImpl {
+ private static volatile Map<ServerName, ServerMetadataCacheTestImpl>
INSTANCES = new HashMap<>();
+ private Connection connectionForTesting;
+ ServerMetadataCacheTestImpl(Configuration conf) {
+ super(conf);
+ }
+
+ public static ServerMetadataCacheTestImpl getInstance(Configuration conf,
ServerName serverName) {
+ ServerMetadataCacheTestImpl result = INSTANCES.get(serverName);
+ if (result == null) {
+ synchronized (ServerMetadataCacheTestImpl.class) {
+ result = INSTANCES.get(serverName);
+ if (result == null) {
+ result = new ServerMetadataCacheTestImpl(conf);
+ INSTANCES.put(serverName, result);
+ }
+ }
+ }
+ return result;
+ }
+
+ public static void setInstance(ServerName serverName,
ServerMetadataCacheTestImpl cache) {
+ INSTANCES.put(serverName, cache);
+ }
+
+ public Long getLastDDLTimestampForTableFromCacheOnly(byte[] tenantID,
byte[] schemaName,
+ byte[] tableName) {
+ byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName,
tableName);
+ ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
+ return lastDDLTimestampMap.getIfPresent(tableKeyPtr);
+ }
+
+ public void setConnectionForTesting(Connection connection) {
+ this.connectionForTesting = connection;
+ }
+
+ public static void resetCache() {
+ INSTANCES.clear();
+ }
+
+ @Override
+ protected Connection getConnection(Properties properties) throws
SQLException {
+ System.out.println("USED");
Review Comment:
remove sysout comment.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import
org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
+import
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
+import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.phoenix.thirdparty.com.google.common.cache.RemovalListener;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+/**
+ * This manages the cache for all the objects(data table, views, indexes) on
each region server.
+ * Currently, it only stores LAST_DDL_TIMESTAMP in the cache.
+ */
+public class ServerMetadataCacheImpl implements ServerMetadataCache {
+
+ protected Configuration conf;
+ // key is the combination of <tenantID, schema name, table name>, value is
the lastDDLTimestamp
+ protected final Cache<ImmutableBytesPtr, Long> lastDDLTimestampMap;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ServerMetadataCacheImpl.class);
+ private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS =
+ "phoenix.coprocessor.regionserver.cache.ttl.ms";
+ // Keeping default cache expiry for 30 mins since we won't have stale entry
+ // for more than 30 mins.
+ private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS
+ = 30 * 60 * 1000L; // 30 mins
+ private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE
+ = "phoenix.coprocessor.regionserver.cache.size";
+ private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE =
10000L;
+ private static volatile ServerMetadataCacheImpl INSTANCE;
+ private MetricsMetadataCachingSource metricsSource;
+
+ /**
+ * Creates/gets an instance of ServerMetadataCache.
+ *
+ * @param conf configuration
+ * @return cache
+ */
+ public static ServerMetadataCacheImpl getInstance(Configuration conf) {
+ ServerMetadataCacheImpl result = INSTANCE;
+ if (result == null) {
+ synchronized (ServerMetadataCacheImpl.class) {
+ result = INSTANCE;
+ if (result == null) {
+ INSTANCE = result = new ServerMetadataCacheImpl(conf);
+ }
+ }
+ }
+ return result;
+ }
+
+ public ServerMetadataCacheImpl(Configuration conf) {
+ this.conf = conf;
+ this.metricsSource = MetricsPhoenixCoprocessorSourceFactory
+ .getInstance().getMetadataCachingSource();
+ long maxTTL = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS,
+ DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS);
+ long maxSize = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE,
+ DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE);
+ lastDDLTimestampMap = CacheBuilder.newBuilder()
+ .removalListener((RemovalListener<ImmutableBytesPtr, Long>)
notification -> {
+ String key = notification.getKey().toString();
+ LOGGER.debug("Expiring " + key + " because of "
+ + notification.getCause().name());
+ })
+ // maximum number of entries this cache can handle.
+ .maximumSize(maxSize)
+ .expireAfterAccess(maxTTL, TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ /**
+ * Returns the last DDL timestamp from the table.
+ * If not found in cache, then query SYSCAT regionserver.
+ * @param tenantID tenant id
+ * @param schemaName schema name
+ * @param tableName table name
+ * @return last DDL timestamp
+ * @throws Exception
+ */
+ public long getLastDDLTimestampForTable(byte[] tenantID, byte[]
schemaName, byte[] tableName)
+ throws SQLException {
+ String fullTableNameStr = SchemaUtil.getTableName(schemaName,
tableName);
+ byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName,
tableName);
+ ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
+ // Lookup in cache if present.
+ Long lastDDLTimestamp = lastDDLTimestampMap.getIfPresent(tableKeyPtr);
+ if (lastDDLTimestamp != null) {
+ metricsSource.incrementRegionServerMetadataCacheHitCount();
+ LOGGER.trace("Retrieving last ddl timestamp value from cache for
tableName: {}",
+ fullTableNameStr);
+ return lastDDLTimestamp;
+ }
+ metricsSource.incrementRegionServerMetadataCacheMissCount();
+ PTable table;
+ String tenantIDStr = Bytes.toString(tenantID);
+ if (tenantIDStr == null || tenantIDStr.isEmpty()) {
+ tenantIDStr = null;
+ }
+ Properties properties = new Properties();
+ if (tenantIDStr != null) {
+ properties.setProperty(TENANT_ID_ATTRIB, tenantIDStr);
+ }
+ try (Connection connection = getConnection(properties)) {
+ // Using PhoenixRuntime#getTableNoCache since se don't want to
read cached value.
+ table = PhoenixRuntime.getTableNoCache(connection,
fullTableNameStr);
+ // TODO PhoenixRuntime#getTableNoCache can throw
TableNotFoundException.
+ // In that case, do we want to throw non retryable exception back
to the client?
+ // Update cache with the latest DDL timestamp from SYSCAT server.
+ lastDDLTimestampMap.put(tableKeyPtr, table.getLastDDLTimestamp());
+ }
+ return table.getLastDDLTimestamp();
+ }
+
+ /**
+ * Invalidate cache for the given tenantID, schema name and table name.
+ * Guava cache is thread safe so we don't have to synchronize it
explicitly.
+ * @param tenantID tenantID
+ * @param schemaName schemaName
+ * @param tableName tableName
+ */
+ public void invalidate(byte[] tenantID, byte[] schemaName, byte[]
tableName) {
+ String fullTableNameStr = SchemaUtil.getTableName(schemaName,
tableName);
+ LOGGER.debug("Invalidating server metadata cache for tenantID: {},
full table: {}",
+ Bytes.toString(tenantID), fullTableNameStr);
+ byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName,
tableName);
+ ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
+ lastDDLTimestampMap.invalidate(tableKeyPtr);
+ }
+
+ protected Connection getConnection(Properties properties) throws
SQLException {
+ return QueryUtil.getConnectionOnServer(properties, this.conf);
+ }
+
+ protected static void resetCache() {
Review Comment:
Do we need this method here?
##########
phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheImplTest.java:
##########
@@ -121,7 +127,7 @@ public void testCacheForBaseTable() throws Exception {
createTable(conn, tableNameStr, NEVER);
pTable = PhoenixRuntime.getTableNoCache(conn,
tableNameStr);// --> First call to CQSI#getTable
- ServerMetadataCache cache =
ServerMetadataCache.getInstance(config);
+ ServerMetadataCacheTestImpl cache =
ServerMetadataCacheTestImpl.getInstance(config, serverName);
Review Comment:
Instead of directly calling ServerMetadataCacheTestImpl, we should call it
via findCoprocessor method something like this:
```
private ServerMetadataCache getCache() {
String phoenixRegionServerEndpoint =
conf.getValue(REGIONSERVER_COPROCESSOR_CONF_KEY);
assertNotNull(phoenixRegionServerEndpoint);
PhoenixRegionServerEndpoint coproc =
regionServer.getRegionServerCoprocessorHost()
.findCoprocessor(phoenixRegionServerEndpoint.class);
assertNotNull(coproc);
ServerMetadataCache cache = coproc.getServerMetadataCache();
assertNotNull(cache);
return cache;
}
```
##########
phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheImplTest.java:
##########
@@ -76,11 +78,13 @@
import static org.mockito.Mockito.verify;
@Category(ParallelStatsDisabledIT.class)
-public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
+public class ServerMetadataCacheImplTest extends ParallelStatsDisabledIT {
Review Comment:
Do we want to rename this class to `ServerMetadataCacheIT` to avoid
confusion between `ServerMetadataCacheImplTest` and
`ServerMetadataCacheTestImpl`
> Refactor server-side code to support multiple ServerMetadataCache for HA tests
> ------------------------------------------------------------------------------
>
> Key: PHOENIX-7251
> URL: https://issues.apache.org/jira/browse/PHOENIX-7251
> Project: Phoenix
> Issue Type: Sub-task
> Reporter: Palash Chauhan
> Assignee: Palash Chauhan
> Priority: Major
>
> In the metadata caching re-design, `ServerMetadataCache` is required to be a
> singleton in the implementation. This affects tests for the HA use case
> because the coprocessors on the 2 clusters end up using the same
> `ServerMetadataCache`. All tests which execute queries with 1 of the clusters
> unavailable will fail.
> We can refactor the implementation in the following way to support HA test
> cases:
> 1. Create a `ServerMetadataCache` interface and use the current
> implementation as `ServerMetadataCacheImpl` for all other tests. This would
> be a singleton.
> 2. Implement `ServerMetadataCacheHAImpl` with a map of instances keyed on
> config.
> 3. Extend `PhoenixRegionServerEndpoint` and use `ServerMetadataCacheHAImpl`.
> 4. In HA tests, load this new endpoint on the region servers.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)