This is an automated email from the ASF dual-hosted git repository.

palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 5752adbdca PHOENIX-6066 : MetaDataEndpointImpl.doGetTable should 
acquire a readLock instead of an exclusive writeLock (#1919)
5752adbdca is described below

commit 5752adbdca7f8a282494719f469a0502a8d7860f
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Jun 27 19:56:08 2024 -0700

    PHOENIX-6066 : MetaDataEndpointImpl.doGetTable should acquire a readLock 
instead of an exclusive writeLock (#1919)
    
    * non exclusive row lock for doGet apis
    
    * add config
    
    * add test
    
    * change test annotation
    
    * config enabled by default
    
    * add license to test class
    
    ---------
    
    Co-authored-by: Palash Chauhan 
<[email protected]>
---
 .../org/apache/phoenix/query/QueryServices.java    |   2 +
 .../apache/phoenix/query/QueryServicesOptions.java |   2 +
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  38 +++---
 .../end2end/MetadataGetTableReadLockIT.java        | 150 +++++++++++++++++++++
 4 files changed, 176 insertions(+), 16 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index df7f669c6c..404bc93be0 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -504,6 +504,8 @@ public interface QueryServices extends SQLCloseable {
 
     int DEFAULT_PHOENIX_GET_REGIONS_RETRIES = 10;
 
+    String PHOENIX_GET_METADATA_READ_LOCK_ENABLED = 
"phoenix.get.metadata.read.lock.enabled";
+
     /**
      * Get executor service used for parallel scans
      */
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 6dd7a620b7..abc59b6eb9 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -445,6 +445,8 @@ public class QueryServicesOptions {
     public static final int DEFAULT_MAX_REGION_LOCATIONS_SIZE_EXPLAIN_PLAN = 5;
     public static final boolean DEFAULT_SERVER_MERGE_FOR_UNCOVERED_INDEX = 
true;
 
+    public static final boolean DEFAULT_PHOENIX_GET_METADATA_READ_LOCK_ENABLED 
= true;
+
 
     private final Configuration config;
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index d5f4c67ad4..2697b1af3d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -607,6 +607,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
     // before 4.15, so that we can rollback the upgrade to 4.15 if required
     private boolean allowSplittableSystemCatalogRollback;
 
+    protected boolean getMetadataReadLockEnabled;
+
     private MetricsMetadataSource metricsSource;
 
     public static void 
setFailConcurrentMutateAddColumnOneTimeForTesting(boolean fail) {
@@ -647,6 +649,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         this.invalidateServerCacheEnabled
                 = 
config.getBoolean(QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED,
                         
QueryServicesOptions.DEFAULT_PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED);
+        this.getMetadataReadLockEnabled
+                = 
config.getBoolean(QueryServices.PHOENIX_GET_METADATA_READ_LOCK_ENABLED,
+                            
QueryServicesOptions.DEFAULT_PHOENIX_GET_METADATA_READ_LOCK_ENABLED);
+
         LOGGER.info("Starting Tracing-Metrics Systems");
         // Start the phoenix trace collection
         Tracing.addTraceMetricsSource();
@@ -2326,7 +2332,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             List<RowLock> locks = Lists.newArrayList();
             // Place a lock using key for the table to be created
             try {
-                acquireLock(region, tableKey, locks);
+                acquireLock(region, tableKey, locks, false);
 
                 // If the table key resides outside the region, return without 
doing anything
                 MetaDataMutationResult result = 
checkTableKeyInRegion(tableKey, region);
@@ -2355,7 +2361,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                             done.run(builder.build());
                             return;
                         }
-                        acquireLock(region, parentTableKey, locks);
+                        acquireLock(region, parentTableKey, locks, false);
                     }
                     // make sure we haven't gone over our threshold for 
indexes on this table.
                     if (execeededIndexQuota(tableType, parentTable)) {
@@ -2882,9 +2888,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
 
             List<RowLock> locks = Lists.newArrayList();
             try {
-                acquireLock(region, lockKey, locks);
+                acquireLock(region, lockKey, locks, false);
                 if (parentLockKey != null) {
-                    acquireLock(region, parentLockKey, locks);
+                    acquireLock(region, parentLockKey, locks, false);
                 }
                 List<InvalidateServerMetadataCacheRequest> requests = new 
ArrayList<>();
                 requests.add(new 
InvalidateServerMetadataCacheRequest(tenantIdBytes, schemaName,
@@ -3028,8 +3034,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         }
     }
 
-    private RowLock acquireLock(Region region, byte[] lockKey, List<RowLock> 
locks) throws IOException {
-        RowLock rowLock = region.getRowLock(lockKey, false);
+    protected RowLock acquireLock(Region region, byte[] lockKey, List<RowLock> 
locks, boolean readLock) throws IOException {
+        RowLock rowLock = region.getRowLock(lockKey, 
this.getMetadataReadLockEnabled && readLock);
         if (rowLock == null) {
             throw new IOException("Failed to acquire lock on " + 
Bytes.toStringBinary(lockKey));
         }
@@ -3311,7 +3317,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     return mutationResult.get();
                 }
                 // We take a write row lock for tenantId, schemaName, 
tableOrViewName
-                acquireLock(region, key, locks);
+                acquireLock(region, key, locks, false);
                 // Invalidate the cache from all the regionservers.
                 List<InvalidateServerMetadataCacheRequest> requests = new 
ArrayList<>();
                 requests.add(new 
InvalidateServerMetadataCacheRequest(tenantId, schemaName,
@@ -3766,7 +3772,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         final boolean wasLocked = (rowLock != null);
         try {
             if (!wasLocked) {
-                rowLock = acquireLock(region, key, null);
+                rowLock = acquireLock(region, key, null, true);
             }
             PTable table =
                     getTableFromCache(cacheKey, clientTimeStamp, 
clientVersion);
@@ -3819,7 +3825,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         ;
         try {
             for (int i = 0; i < keys.size(); i++) {
-                acquireLock(region, keys.get(i), rowLocks);
+                acquireLock(region, keys.get(i), rowLocks, true);
             }
 
             List<PFunction> functionsAvailable = new 
ArrayList<PFunction>(keys.size());
@@ -3928,7 +3934,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 // Since we're dropping the index, lock it to ensure
                 // that a change in index state doesn't
                 // occur while we're dropping it.
-                acquireLock(region, indexKey, locks);
+                acquireLock(region, indexKey, locks, false);
                 // invalidate server metadata cache when dropping index
                 List<InvalidateServerMetadataCacheRequest> requests = new 
ArrayList<>();
                 requests.add(new InvalidateServerMetadataCacheRequest(tenantId,
@@ -4147,7 +4153,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             }
             PIndexState newState =
                     
PIndexState.fromSerializedValue(newKV.getValueArray()[newKV.getValueOffset()]);
-            RowLock rowLock = acquireLock(region, key, null);
+            RowLock rowLock = acquireLock(region, key, null, false);
             if (rowLock == null) {
                 throw new IOException("Failed to acquire lock on " + 
Bytes.toStringBinary(key));
             }
@@ -4472,7 +4478,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         List<RowLock> locks = Lists.newArrayList();
         try {
             getCoprocessorHost().preGetSchema(schemaName);
-            acquireLock(region, lockKey, locks);
+            acquireLock(region, lockKey, locks, false);
             // Get as of latest timestamp so we can detect if we have a
             // newer schema that already
             // exists without making an additional query
@@ -4576,7 +4582,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             List<RowLock> locks = Lists.newArrayList();
             long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(functionMetaData);
             try {
-                acquireLock(region, lockKey, locks);
+                acquireLock(region, lockKey, locks, false);
                 // Get as of latest timestamp so we can detect if we have a 
newer function that already
                 // exists without making an additional query
                 ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey);
@@ -4652,7 +4658,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             List<RowLock> locks = Lists.newArrayList();
             long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(functionMetaData);
             try {
-                acquireLock(region, lockKey, locks);
+                acquireLock(region, lockKey, locks, false);
                 List<byte[]> keys = new ArrayList<byte[]>(1);
                 keys.add(lockKey);
                 List<ImmutableBytesPtr> invalidateList = new 
ArrayList<ImmutableBytesPtr>();
@@ -4760,7 +4766,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             List<RowLock> locks = Lists.newArrayList();
             long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(schemaMutations);
             try {
-                acquireLock(region, lockKey, locks);
+                acquireLock(region, lockKey, locks, false);
                 // Get as of latest timestamp so we can detect if we have a 
newer schema that already exists without
                 // making an additional query
                 ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(lockKey);
@@ -4827,7 +4833,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             List<RowLock> locks = Lists.newArrayList();
             long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(schemaMetaData);
             try {
-                acquireLock(region, lockKey, locks);
+                acquireLock(region, lockKey, locks, false);
                 List<ImmutableBytesPtr> invalidateList = new 
ArrayList<ImmutableBytesPtr>(1);
                 result = doDropSchema(clientTimeStamp, schemaName, lockKey, 
schemaMetaData, invalidateList);
                 if (result.getMutationCode() != 
MutationCode.SCHEMA_ALREADY_EXISTS) {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataGetTableReadLockIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataGetTableReadLockIT.java
new file mode 100644
index 0000000000..7fd98801e2
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataGetTableReadLockIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Class which tests whether non-exclusive locking on metadata read path 
(getTable) works as expected.
+ */
+@Category(ParallelStatsDisabledTest.class)
+public class MetadataGetTableReadLockIT extends BaseTest {
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        // Disable system task handling
+        props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(Long.MAX_VALUE));
+        setUpTestDriver(new ReadOnlyProps(props));
+    }
+
+    /**
+     * Create 2 threads which query a table.
+     * Thread-1 sleeps in the getTable path after acquiring a non-exclusive 
read lock.
+     * Thread-2 should not wait to acquire a lock for its query.
+     */
+    @Test
+    public void testBlockedReadDoesNotBlockAnotherRead() throws Exception {
+        long SLEEP_DURATION = 5000L;
+        String tableName = generateUniqueName();
+        CountDownLatch sleepSignal = new CountDownLatch(1);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            // create table
+            conn.createStatement().execute("CREATE TABLE " + tableName
+                    + "(k INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2 
INTEGER)");
+
+            // load custom coproc which supports blocking
+            TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", 
MetaDataEndpointImpl.class);
+            BlockingMetaDataEndpointImpl.setSleepSignal(sleepSignal);
+            BlockingMetaDataEndpointImpl.setSleepDuration(SLEEP_DURATION);
+            TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG", 
BlockingMetaDataEndpointImpl.class);
+
+            // start thread-1 and wait for signal before it starts sleeping
+            Thread t1 = getQueryThread(tableName);
+            Thread t2 = getQueryThread(tableName);
+            t1.start();
+            sleepSignal.await();
+
+            // we don't want thread-2 to sleep at all
+            BlockingMetaDataEndpointImpl.setSleepDuration(0);
+            long start = System.currentTimeMillis();
+            t2.start();
+            t2.join();
+            long end = System.currentTimeMillis();
+            t1.join();
+
+            // if thread-2 did not wait to acquire lock, it will finish faster 
than thread-1's SLEEP_DURATION
+            Assert.assertTrue("Second thread should not have been blocked by 
the first thread.",
+                    end-start < SLEEP_DURATION);
+        }
+    }
+
+    private static Thread getQueryThread(String tableName) {
+        Runnable runnable = () -> {
+            try (Connection conn1 = DriverManager.getConnection(getUrl())) {
+                conn1.createStatement().execute("SELECT * FROM " + tableName + 
" LIMIT 1");
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        };
+        Thread t = new Thread(runnable);
+        return t;
+    }
+
+    /**
+     * Extend MetaDataEndpointImpl with support for blocking after acquiring 
lock.
+     */
+    public static class BlockingMetaDataEndpointImpl extends 
MetaDataEndpointImpl {
+
+        private static volatile long sleepDuration;
+        private static CountDownLatch sleepSignal;
+
+        public static void setSleepDuration(long sleepDuration) {
+            BlockingMetaDataEndpointImpl.sleepDuration = sleepDuration;
+        }
+
+        public static void setSleepSignal(CountDownLatch sleepSignal) {
+            BlockingMetaDataEndpointImpl.sleepSignal = sleepSignal;
+        }
+
+        @Override
+        protected Region.RowLock acquireLock(Region region, byte[] lockKey, 
List<Region.RowLock> locks, boolean readLock) throws IOException {
+            Region.RowLock rowLock = region.getRowLock(lockKey, 
this.getMetadataReadLockEnabled && readLock);
+            if (rowLock == null) {
+                throw new IOException("Failed to acquire lock on " + 
Bytes.toStringBinary(lockKey));
+            }
+            sleepSignal.countDown();
+            if (locks != null) {
+                locks.add(rowLock);
+            }
+            try {
+                Thread.sleep(sleepDuration);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+            return rowLock;
+        }
+    }
+
+    @AfterClass
+    public static synchronized void cleanup() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", 
BlockingMetaDataEndpointImpl.class);
+            TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG", 
MetaDataEndpointImpl.class);
+        }
+    }
+}

Reply via email to