This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new 273086bd21 PHOENIX-6066 : MetaDataEndpointImpl.doGetTable should
acquire a readLock instead of an exclusive writeLock (#1919) (#1921)
273086bd21 is described below
commit 273086bd21e365ce2378c25338781b2e398cb0b1
Author: Palash Chauhan <[email protected]>
AuthorDate: Sun Jun 30 18:48:38 2024 -0700
PHOENIX-6066 : MetaDataEndpointImpl.doGetTable should acquire a readLock
instead of an exclusive writeLock (#1919) (#1921)
* non exclusive row lock for doGet apis
* add config
* add test
* change test annotation
* config enabled by default
* add license to test class
---------
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../org/apache/phoenix/query/QueryServices.java | 2 +
.../apache/phoenix/query/QueryServicesOptions.java | 2 +
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 39 +++---
.../end2end/MetadataGetTableReadLockIT.java | 150 +++++++++++++++++++++
4 files changed, 176 insertions(+), 17 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 0279377d5a..42c86171fe 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -486,6 +486,8 @@ public interface QueryServices extends SQLCloseable {
int DEFAULT_PHOENIX_GET_REGIONS_RETRIES = 10;
+ String PHOENIX_GET_METADATA_READ_LOCK_ENABLED =
"phoenix.get.metadata.read.lock.enabled";
+
/**
* Get executor service used for parallel scans
*/
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3dac05b8cd..041b4df1fd 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -425,6 +425,8 @@ public class QueryServicesOptions {
public static final int DEFAULT_MAX_REGION_LOCATIONS_SIZE_EXPLAIN_PLAN = 5;
public static final boolean DEFAULT_SERVER_MERGE_FOR_UNCOVERED_INDEX =
true;
+ public static final boolean DEFAULT_PHOENIX_GET_METADATA_READ_LOCK_ENABLED
= true;
+
private final Configuration config;
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 4c60dccb26..6587534254 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -586,6 +586,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
// before 4.15, so that we can rollback the upgrade to 4.15 if required
private boolean allowSplittableSystemCatalogRollback;
+ protected boolean getMetadataReadLockEnabled;
+
private MetricsMetadataSource metricsSource;
public static void
setFailConcurrentMutateAddColumnOneTimeForTesting(boolean fail) {
@@ -623,6 +625,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
new ReadOnlyProps(config.iterator()));
this.allowSplittableSystemCatalogRollback =
config.getBoolean(QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK,
QueryServicesOptions.DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK);
+ this.getMetadataReadLockEnabled
+ =
config.getBoolean(QueryServices.PHOENIX_GET_METADATA_READ_LOCK_ENABLED,
+
QueryServicesOptions.DEFAULT_PHOENIX_GET_METADATA_READ_LOCK_ENABLED);
LOGGER.info("Starting Tracing-Metrics Systems");
// Start the phoenix trace collection
@@ -2218,7 +2223,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
// Place a lock using key for the table to be created
try {
- acquireLock(region, tableKey, locks);
+ acquireLock(region, tableKey, locks, false);
// If the table key resides outside the region, return without
doing anything
MetaDataMutationResult result =
checkTableKeyInRegion(tableKey, region);
@@ -2247,7 +2252,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
done.run(builder.build());
return;
}
- acquireLock(region, parentTableKey, locks);
+ acquireLock(region, parentTableKey, locks, false);
}
// make sure we haven't gone over our threshold for
indexes on this table.
if (execeededIndexQuota(tableType, parentTable)) {
@@ -2755,9 +2760,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
try {
- acquireLock(region, lockKey, locks);
+ acquireLock(region, lockKey, locks, false);
if (parentLockKey != null) {
- acquireLock(region, parentLockKey, locks);
+ acquireLock(region, parentLockKey, locks, false);
}
List<ImmutableBytesPtr> invalidateList = new
ArrayList<ImmutableBytesPtr>();
@@ -2889,8 +2894,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
}
}
- private RowLock acquireLock(Region region, byte[] lockKey, List<RowLock>
locks) throws IOException {
- RowLock rowLock = region.getRowLock(lockKey, false);
+ protected RowLock acquireLock(Region region, byte[] lockKey, List<RowLock>
locks, boolean readLock) throws IOException {
+ RowLock rowLock = region.getRowLock(lockKey,
this.getMetadataReadLockEnabled && readLock);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " +
Bytes.toStringBinary(lockKey));
}
@@ -3171,8 +3176,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
if (mutationResult.isPresent()) {
return mutationResult.get();
}
-
- acquireLock(region, key, locks);
+ // We take a write row lock for tenantId, schemaName,
tableOrViewName
+ acquireLock(region, key, locks, false);
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
List<ImmutableBytesPtr> invalidateList = new
ArrayList<ImmutableBytesPtr>();
invalidateList.add(cacheKey);
@@ -3574,7 +3579,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
final boolean wasLocked = (rowLock != null);
try {
if (!wasLocked) {
- rowLock = acquireLock(region, key, null);
+ rowLock = acquireLock(region, key, null, true);
}
PTable table =
getTableFromCache(cacheKey, clientTimeStamp,
clientVersion);
@@ -3625,7 +3630,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
;
try {
for (int i = 0; i < keys.size(); i++) {
- acquireLock(region, keys.get(i), rowLocks);
+ acquireLock(region, keys.get(i), rowLocks, true);
}
List<PFunction> functionsAvailable = new
ArrayList<PFunction>(keys.size());
@@ -3736,7 +3741,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
// Since we're dropping the index, lock it to ensure
// that a change in index state doesn't
// occur while we're dropping it.
- acquireLock(region, indexKey, locks);
+ acquireLock(region, indexKey, locks, false);
List<Mutation> childLinksMutations = Lists.newArrayList();
MetaDataMutationResult result = doDropTable(indexKey, tenantId,
index.getSchemaName().getBytes(),
index.getTableName().getBytes(),
@@ -3948,7 +3953,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
}
PIndexState newState =
PIndexState.fromSerializedValue(newKV.getValueArray()[newKV.getValueOffset()]);
- RowLock rowLock = acquireLock(region, key, null);
+ RowLock rowLock = acquireLock(region, key, null, false);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " +
Bytes.toStringBinary(key));
}
@@ -4269,7 +4274,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
try {
getCoprocessorHost().preGetSchema(schemaName);
- acquireLock(region, lockKey, locks);
+ acquireLock(region, lockKey, locks, false);
// Get as of latest timestamp so we can detect if we have a
// newer schema that already
// exists without making an additional query
@@ -4373,7 +4378,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp =
MetaDataUtil.getClientTimeStamp(functionMetaData);
try {
- acquireLock(region, lockKey, locks);
+ acquireLock(region, lockKey, locks, false);
// Get as of latest timestamp so we can detect if we have a
newer function that already
// exists without making an additional query
ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey);
@@ -4449,7 +4454,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp =
MetaDataUtil.getClientTimeStamp(functionMetaData);
try {
- acquireLock(region, lockKey, locks);
+ acquireLock(region, lockKey, locks, false);
List<byte[]> keys = new ArrayList<byte[]>(1);
keys.add(lockKey);
List<ImmutableBytesPtr> invalidateList = new
ArrayList<ImmutableBytesPtr>();
@@ -4557,7 +4562,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp =
MetaDataUtil.getClientTimeStamp(schemaMutations);
try {
- acquireLock(region, lockKey, locks);
+ acquireLock(region, lockKey, locks, false);
// Get as of latest timestamp so we can detect if we have a
newer schema that already exists without
// making an additional query
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(lockKey);
@@ -4624,7 +4629,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp =
MetaDataUtil.getClientTimeStamp(schemaMetaData);
try {
- acquireLock(region, lockKey, locks);
+ acquireLock(region, lockKey, locks, false);
List<ImmutableBytesPtr> invalidateList = new
ArrayList<ImmutableBytesPtr>(1);
result = doDropSchema(clientTimeStamp, schemaName, lockKey,
schemaMetaData, invalidateList);
if (result.getMutationCode() !=
MutationCode.SCHEMA_ALREADY_EXISTS) {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataGetTableReadLockIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataGetTableReadLockIT.java
new file mode 100644
index 0000000000..7fd98801e2
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataGetTableReadLockIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Class which tests whether non-exclusive locking on metadata read path
(getTable) works as expected.
+ */
+@Category(ParallelStatsDisabledTest.class)
+public class MetadataGetTableReadLockIT extends BaseTest {
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ // Disable system task handling
+ props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
+ setUpTestDriver(new ReadOnlyProps(props));
+ }
+
+ /**
+ * Create 2 threads which query a table.
+ * Thread-1 sleeps in the getTable path after acquiring a non-exclusive
read lock.
+ * Thread-2 should not wait to acquire a lock for its query.
+ */
+ @Test
+ public void testBlockedReadDoesNotBlockAnotherRead() throws Exception {
+ long SLEEP_DURATION = 5000L;
+ String tableName = generateUniqueName();
+ CountDownLatch sleepSignal = new CountDownLatch(1);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ // create table
+ conn.createStatement().execute("CREATE TABLE " + tableName
+ + "(k INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2
INTEGER)");
+
+ // load custom coproc which supports blocking
+ TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG",
MetaDataEndpointImpl.class);
+ BlockingMetaDataEndpointImpl.setSleepSignal(sleepSignal);
+ BlockingMetaDataEndpointImpl.setSleepDuration(SLEEP_DURATION);
+ TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG",
BlockingMetaDataEndpointImpl.class);
+
+ // start thread-1 and wait for signal before it starts sleeping
+ Thread t1 = getQueryThread(tableName);
+ Thread t2 = getQueryThread(tableName);
+ t1.start();
+ sleepSignal.await();
+
+ // we don't want thread-2 to sleep at all
+ BlockingMetaDataEndpointImpl.setSleepDuration(0);
+ long start = System.currentTimeMillis();
+ t2.start();
+ t2.join();
+ long end = System.currentTimeMillis();
+ t1.join();
+
+ // if thread-2 did not wait to acquire lock, it will finish faster
than thread-1's SLEEP_DURATION
+ Assert.assertTrue("Second thread should not have been blocked by
the first thread.",
+ end-start < SLEEP_DURATION);
+ }
+ }
+
+ private static Thread getQueryThread(String tableName) {
+ Runnable runnable = () -> {
+ try (Connection conn1 = DriverManager.getConnection(getUrl())) {
+ conn1.createStatement().execute("SELECT * FROM " + tableName +
" LIMIT 1");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+ Thread t = new Thread(runnable);
+ return t;
+ }
+
+ /**
+ * Extend MetaDataEndpointImpl with support for blocking after acquiring
lock.
+ */
+ public static class BlockingMetaDataEndpointImpl extends
MetaDataEndpointImpl {
+
+ private static volatile long sleepDuration;
+ private static CountDownLatch sleepSignal;
+
+ public static void setSleepDuration(long sleepDuration) {
+ BlockingMetaDataEndpointImpl.sleepDuration = sleepDuration;
+ }
+
+ public static void setSleepSignal(CountDownLatch sleepSignal) {
+ BlockingMetaDataEndpointImpl.sleepSignal = sleepSignal;
+ }
+
+ @Override
+ protected Region.RowLock acquireLock(Region region, byte[] lockKey,
List<Region.RowLock> locks, boolean readLock) throws IOException {
+ Region.RowLock rowLock = region.getRowLock(lockKey,
this.getMetadataReadLockEnabled && readLock);
+ if (rowLock == null) {
+ throw new IOException("Failed to acquire lock on " +
Bytes.toStringBinary(lockKey));
+ }
+ sleepSignal.countDown();
+ if (locks != null) {
+ locks.add(rowLock);
+ }
+ try {
+ Thread.sleep(sleepDuration);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return rowLock;
+ }
+ }
+
+ @AfterClass
+ public static synchronized void cleanup() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG",
BlockingMetaDataEndpointImpl.class);
+ TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG",
MetaDataEndpointImpl.class);
+ }
+ }
+}