This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 5752adbdca PHOENIX-6066 : MetaDataEndpointImpl.doGetTable should
acquire a readLock instead of an exclusive writeLock (#1919)
5752adbdca is described below
commit 5752adbdca7f8a282494719f469a0502a8d7860f
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Jun 27 19:56:08 2024 -0700
PHOENIX-6066 : MetaDataEndpointImpl.doGetTable should acquire a readLock
instead of an exclusive writeLock (#1919)
* non exclusive row lock for doGet apis
* add config
* add test
* change test annotation
* config enabled by default
* add license to test class
---------
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../org/apache/phoenix/query/QueryServices.java | 2 +
.../apache/phoenix/query/QueryServicesOptions.java | 2 +
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 38 +++---
.../end2end/MetadataGetTableReadLockIT.java | 150 +++++++++++++++++++++
4 files changed, 176 insertions(+), 16 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index df7f669c6c..404bc93be0 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -504,6 +504,8 @@ public interface QueryServices extends SQLCloseable {
int DEFAULT_PHOENIX_GET_REGIONS_RETRIES = 10;
+ String PHOENIX_GET_METADATA_READ_LOCK_ENABLED =
"phoenix.get.metadata.read.lock.enabled";
+
/**
* Get executor service used for parallel scans
*/
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 6dd7a620b7..abc59b6eb9 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -445,6 +445,8 @@ public class QueryServicesOptions {
public static final int DEFAULT_MAX_REGION_LOCATIONS_SIZE_EXPLAIN_PLAN = 5;
public static final boolean DEFAULT_SERVER_MERGE_FOR_UNCOVERED_INDEX =
true;
+ public static final boolean DEFAULT_PHOENIX_GET_METADATA_READ_LOCK_ENABLED
= true;
+
private final Configuration config;
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index d5f4c67ad4..2697b1af3d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -607,6 +607,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
// before 4.15, so that we can rollback the upgrade to 4.15 if required
private boolean allowSplittableSystemCatalogRollback;
+ protected boolean getMetadataReadLockEnabled;
+
private MetricsMetadataSource metricsSource;
public static void
setFailConcurrentMutateAddColumnOneTimeForTesting(boolean fail) {
@@ -647,6 +649,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
this.invalidateServerCacheEnabled
=
config.getBoolean(QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED,
QueryServicesOptions.DEFAULT_PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED);
+ this.getMetadataReadLockEnabled
+ =
config.getBoolean(QueryServices.PHOENIX_GET_METADATA_READ_LOCK_ENABLED,
+
QueryServicesOptions.DEFAULT_PHOENIX_GET_METADATA_READ_LOCK_ENABLED);
+
LOGGER.info("Starting Tracing-Metrics Systems");
// Start the phoenix trace collection
Tracing.addTraceMetricsSource();
@@ -2326,7 +2332,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
// Place a lock using key for the table to be created
try {
- acquireLock(region, tableKey, locks);
+ acquireLock(region, tableKey, locks, false);
// If the table key resides outside the region, return without
doing anything
MetaDataMutationResult result =
checkTableKeyInRegion(tableKey, region);
@@ -2355,7 +2361,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
done.run(builder.build());
return;
}
- acquireLock(region, parentTableKey, locks);
+ acquireLock(region, parentTableKey, locks, false);
}
// make sure we haven't gone over our threshold for
indexes on this table.
if (execeededIndexQuota(tableType, parentTable)) {
@@ -2882,9 +2888,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
try {
- acquireLock(region, lockKey, locks);
+ acquireLock(region, lockKey, locks, false);
if (parentLockKey != null) {
- acquireLock(region, parentLockKey, locks);
+ acquireLock(region, parentLockKey, locks, false);
}
List<InvalidateServerMetadataCacheRequest> requests = new
ArrayList<>();
requests.add(new
InvalidateServerMetadataCacheRequest(tenantIdBytes, schemaName,
@@ -3028,8 +3034,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
}
}
- private RowLock acquireLock(Region region, byte[] lockKey, List<RowLock>
locks) throws IOException {
- RowLock rowLock = region.getRowLock(lockKey, false);
+ protected RowLock acquireLock(Region region, byte[] lockKey, List<RowLock>
locks, boolean readLock) throws IOException {
+ RowLock rowLock = region.getRowLock(lockKey,
this.getMetadataReadLockEnabled && readLock);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " +
Bytes.toStringBinary(lockKey));
}
@@ -3311,7 +3317,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
return mutationResult.get();
}
// We take a write row lock for tenantId, schemaName,
tableOrViewName
- acquireLock(region, key, locks);
+ acquireLock(region, key, locks, false);
// Invalidate the cache from all the regionservers.
List<InvalidateServerMetadataCacheRequest> requests = new
ArrayList<>();
requests.add(new
InvalidateServerMetadataCacheRequest(tenantId, schemaName,
@@ -3766,7 +3772,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
final boolean wasLocked = (rowLock != null);
try {
if (!wasLocked) {
- rowLock = acquireLock(region, key, null);
+ rowLock = acquireLock(region, key, null, true);
}
PTable table =
getTableFromCache(cacheKey, clientTimeStamp,
clientVersion);
@@ -3819,7 +3825,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
;
try {
for (int i = 0; i < keys.size(); i++) {
- acquireLock(region, keys.get(i), rowLocks);
+ acquireLock(region, keys.get(i), rowLocks, true);
}
List<PFunction> functionsAvailable = new
ArrayList<PFunction>(keys.size());
@@ -3928,7 +3934,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
// Since we're dropping the index, lock it to ensure
// that a change in index state doesn't
// occur while we're dropping it.
- acquireLock(region, indexKey, locks);
+ acquireLock(region, indexKey, locks, false);
// invalidate server metadata cache when dropping index
List<InvalidateServerMetadataCacheRequest> requests = new
ArrayList<>();
requests.add(new InvalidateServerMetadataCacheRequest(tenantId,
@@ -4147,7 +4153,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
}
PIndexState newState =
PIndexState.fromSerializedValue(newKV.getValueArray()[newKV.getValueOffset()]);
- RowLock rowLock = acquireLock(region, key, null);
+ RowLock rowLock = acquireLock(region, key, null, false);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " +
Bytes.toStringBinary(key));
}
@@ -4472,7 +4478,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
try {
getCoprocessorHost().preGetSchema(schemaName);
- acquireLock(region, lockKey, locks);
+ acquireLock(region, lockKey, locks, false);
// Get as of latest timestamp so we can detect if we have a
// newer schema that already
// exists without making an additional query
@@ -4576,7 +4582,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp =
MetaDataUtil.getClientTimeStamp(functionMetaData);
try {
- acquireLock(region, lockKey, locks);
+ acquireLock(region, lockKey, locks, false);
// Get as of latest timestamp so we can detect if we have a
newer function that already
// exists without making an additional query
ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey);
@@ -4652,7 +4658,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp =
MetaDataUtil.getClientTimeStamp(functionMetaData);
try {
- acquireLock(region, lockKey, locks);
+ acquireLock(region, lockKey, locks, false);
List<byte[]> keys = new ArrayList<byte[]>(1);
keys.add(lockKey);
List<ImmutableBytesPtr> invalidateList = new
ArrayList<ImmutableBytesPtr>();
@@ -4760,7 +4766,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp =
MetaDataUtil.getClientTimeStamp(schemaMutations);
try {
- acquireLock(region, lockKey, locks);
+ acquireLock(region, lockKey, locks, false);
// Get as of latest timestamp so we can detect if we have a
newer schema that already exists without
// making an additional query
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(lockKey);
@@ -4827,7 +4833,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp =
MetaDataUtil.getClientTimeStamp(schemaMetaData);
try {
- acquireLock(region, lockKey, locks);
+ acquireLock(region, lockKey, locks, false);
List<ImmutableBytesPtr> invalidateList = new
ArrayList<ImmutableBytesPtr>(1);
result = doDropSchema(clientTimeStamp, schemaName, lockKey,
schemaMetaData, invalidateList);
if (result.getMutationCode() !=
MutationCode.SCHEMA_ALREADY_EXISTS) {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataGetTableReadLockIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataGetTableReadLockIT.java
new file mode 100644
index 0000000000..7fd98801e2
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataGetTableReadLockIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Class which tests whether non-exclusive locking on metadata read path
(getTable) works as expected.
+ */
+@Category(ParallelStatsDisabledTest.class)
+public class MetadataGetTableReadLockIT extends BaseTest {
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ // Disable system task handling
+ props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
+ setUpTestDriver(new ReadOnlyProps(props));
+ }
+
+ /**
+ * Create 2 threads which query a table.
+ * Thread-1 sleeps in the getTable path after acquiring a non-exclusive
read lock.
+ * Thread-2 should not wait to acquire a lock for its query.
+ */
+ @Test
+ public void testBlockedReadDoesNotBlockAnotherRead() throws Exception {
+ long SLEEP_DURATION = 5000L;
+ String tableName = generateUniqueName();
+ CountDownLatch sleepSignal = new CountDownLatch(1);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ // create table
+ conn.createStatement().execute("CREATE TABLE " + tableName
+ + "(k INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2
INTEGER)");
+
+ // load custom coproc which supports blocking
+ TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG",
MetaDataEndpointImpl.class);
+ BlockingMetaDataEndpointImpl.setSleepSignal(sleepSignal);
+ BlockingMetaDataEndpointImpl.setSleepDuration(SLEEP_DURATION);
+ TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG",
BlockingMetaDataEndpointImpl.class);
+
+ // start thread-1 and wait for signal before it starts sleeping
+ Thread t1 = getQueryThread(tableName);
+ Thread t2 = getQueryThread(tableName);
+ t1.start();
+ sleepSignal.await();
+
+ // we don't want thread-2 to sleep at all
+ BlockingMetaDataEndpointImpl.setSleepDuration(0);
+ long start = System.currentTimeMillis();
+ t2.start();
+ t2.join();
+ long end = System.currentTimeMillis();
+ t1.join();
+
+ // if thread-2 did not wait to acquire lock, it will finish faster
than thread-1's SLEEP_DURATION
+ Assert.assertTrue("Second thread should not have been blocked by
the first thread.",
+ end-start < SLEEP_DURATION);
+ }
+ }
+
+ private static Thread getQueryThread(String tableName) {
+ Runnable runnable = () -> {
+ try (Connection conn1 = DriverManager.getConnection(getUrl())) {
+ conn1.createStatement().execute("SELECT * FROM " + tableName +
" LIMIT 1");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+ Thread t = new Thread(runnable);
+ return t;
+ }
+
+ /**
+ * Extend MetaDataEndpointImpl with support for blocking after acquiring
lock.
+ */
+ public static class BlockingMetaDataEndpointImpl extends
MetaDataEndpointImpl {
+
+ private static volatile long sleepDuration;
+ private static CountDownLatch sleepSignal;
+
+ public static void setSleepDuration(long sleepDuration) {
+ BlockingMetaDataEndpointImpl.sleepDuration = sleepDuration;
+ }
+
+ public static void setSleepSignal(CountDownLatch sleepSignal) {
+ BlockingMetaDataEndpointImpl.sleepSignal = sleepSignal;
+ }
+
+ @Override
+ protected Region.RowLock acquireLock(Region region, byte[] lockKey,
List<Region.RowLock> locks, boolean readLock) throws IOException {
+ Region.RowLock rowLock = region.getRowLock(lockKey,
this.getMetadataReadLockEnabled && readLock);
+ if (rowLock == null) {
+ throw new IOException("Failed to acquire lock on " +
Bytes.toStringBinary(lockKey));
+ }
+ sleepSignal.countDown();
+ if (locks != null) {
+ locks.add(rowLock);
+ }
+ try {
+ Thread.sleep(sleepDuration);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return rowLock;
+ }
+ }
+
+ @AfterClass
+ public static synchronized void cleanup() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG",
BlockingMetaDataEndpointImpl.class);
+ TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG",
MetaDataEndpointImpl.class);
+ }
+ }
+}