This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new db203e2b36 PHOENIX-7363 Protect server side metadata cache updates for
the given PTable (#1943)
db203e2b36 is described below
commit db203e2b36fff2a070007f8baea5bc1f3e35ab77
Author: Viraj Jasani <[email protected]>
AuthorDate: Sun Jul 28 09:51:26 2024 -0800
PHOENIX-7363 Protect server side metadata cache updates for the given
PTable (#1943)
---
.../org/apache/phoenix/query/QueryServices.java | 9 ++++
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 52 +++++++++++++++++-----
2 files changed, 51 insertions(+), 10 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 42c86171fe..79fadf603e 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -488,6 +488,15 @@ public interface QueryServices extends SQLCloseable {
String PHOENIX_GET_METADATA_READ_LOCK_ENABLED =
"phoenix.get.metadata.read.lock.enabled";
+ /**
+ * If server side metadata cache is empty, take Phoenix writeLock for the
given row
+ * and make sure we can acquire the writeLock within the configurable
duration.
+ */
+ String PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT =
+ "phoenix.metadata.update.rowlock.timeout";
+
+ long DEFAULT_PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT = 60000;
+
/**
* Get executor service used for parallel scans
*/
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index a2d57789cb..a10606bd1e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -182,6 +182,7 @@ import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.ProjectedColumnExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import
org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
+import org.apache.phoenix.hbase.index.LockManager;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -312,6 +313,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol
implements RegionCopr
private static final byte[] PHYSICAL_TABLE_BYTES =
new byte[]{PTable.LinkType.PHYSICAL_TABLE.getSerializedValue()};
+ private LockManager lockManager;
+ private long metadataCacheRowLockTimeout;
+
// KeyValues for Table
private static final Cell TABLE_TYPE_KV =
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
@@ -615,8 +619,12 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
throw new CoprocessorException("Must be loaded on a table
region!");
}
+ this.lockManager = new LockManager();
phoenixAccessCoprocessorHost = new
PhoenixMetaDataCoprocessorHost(this.env);
Configuration config = env.getConfiguration();
+ this.metadataCacheRowLockTimeout =
+
config.getLong(QueryServices.PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT,
+
QueryServices.DEFAULT_PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT);
this.accessCheckEnabled =
config.getBoolean(QueryServices.PHOENIX_ACLS_ENABLED,
QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED);
this.blockWriteRebuildIndex =
config.getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
@@ -735,6 +743,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP,
clientTimeStamp);
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
PTable newTable;
+ region.startRegionOperation();
try (RegionScanner scanner = region.getScanner(scan)) {
PTable oldTable = (PTable) metaDataCache.getIfPresent(cacheKey);
long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP - 1 :
oldTable.getTimeStamp();
@@ -752,6 +761,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
}
metaDataCache.put(cacheKey, newTable);
}
+ } finally {
+ region.closeRegionOperation();
}
return newTable;
}
@@ -3596,8 +3607,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
rowLock = acquireLock(region, key, null, true);
}
PTable table =
- getTableFromCache(cacheKey, clientTimeStamp,
clientVersion);
- table = modifyIndexStateForOldClient(clientVersion, table);
+ getTableFromCacheWithModifiedIndexState(clientTimeStamp,
clientVersion, cacheKey);
// We only cache the latest, so we'll end up building the table
with every call if the
// client connection has specified an SCN.
// TODO: If we indicate to the client that we're returning an
older version, but there's a
@@ -3610,22 +3620,44 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
}
return table;
}
- // Query for the latest table first, since it's not cached
- table =
+ // take Phoenix row level write-lock as we need to protect
metadata cache update
+ // after scanning SYSTEM.CATALOG to retrieve the PTable object
+ LockManager.RowLock phoenixRowLock =
+ lockManager.lockRow(key, this.metadataCacheRowLockTimeout);
+ try {
+ table =
getTableFromCacheWithModifiedIndexState(clientTimeStamp, clientVersion,
+ cacheKey);
+ if (table != null && table.getTimeStamp() < clientTimeStamp) {
+ if (isTableDeleted(table)) {
+ return null;
+ }
+ return table;
+ }
+ // Query for the latest table first, since it's not cached
+ table =
buildTable(key, cacheKey, region,
HConstants.LATEST_TIMESTAMP, clientVersion);
- if ((table != null && table.getTimeStamp() <= clientTimeStamp) ||
- (blockWriteRebuildIndex &&
table.getIndexDisableTimestamp() > 0)) {
+ if ((table != null && table.getTimeStamp() <= clientTimeStamp)
|| (
+ blockWriteRebuildIndex && table.getIndexDisableTimestamp()
> 0)) {
+ return table;
+ }
+ // Otherwise, query for an older version of the table - it
won't be cached
+ table = buildTable(key, cacheKey, region, clientTimeStamp,
clientVersion);
return table;
+ } finally {
+ phoenixRowLock.release();
}
- // Otherwise, query for an older version of the table - it won't
be cached
- table =
- buildTable(key, cacheKey, region, clientTimeStamp,
clientVersion);
- return table;
} finally {
if (!wasLocked && rowLock != null) rowLock.release();
}
}
+ private PTable getTableFromCacheWithModifiedIndexState(long
clientTimeStamp, int clientVersion,
+ ImmutableBytesPtr cacheKey) throws SQLException {
+ PTable table = getTableFromCache(cacheKey, clientTimeStamp,
clientVersion);
+ table = modifyIndexStateForOldClient(clientVersion, table);
+ return table;
+ }
+
private List<PFunction> doGetFunctions(List<byte[]> keys, long
clientTimeStamp) throws IOException, SQLException {
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();