This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new ec15049d4d PHOENIX-6884 Phoenix to use hbase.rpc.read.timeout and 
hbase.rpc.writ… (#1566)
ec15049d4d is described below

commit ec15049d4d8a9e2016079691546f92613f1d8c50
Author: kadirozde <37155482+kadiro...@users.noreply.github.com>
AuthorDate: Wed Feb 22 09:56:11 2023 -0800

    PHOENIX-6884 Phoenix to use hbase.rpc.read.timeout and hbase.rpc.writ… 
(#1566)
---
 .../coprocessor/MetaDataRegionObserver.java        |  4 ++
 .../apache/phoenix/mapreduce/index/IndexTool.java  |  4 ++
 .../phoenix/mapreduce/index/IndexUpgradeTool.java  |  3 ++
 .../java/org/apache/phoenix/util/ScanUtil.java     | 43 +++++++++++++++-------
 .../java/org/apache/phoenix/query/BaseTest.java    |  7 +---
 5 files changed, 42 insertions(+), 19 deletions(-)

diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 7691aceb50..350cd3f7dc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -636,6 +636,10 @@ public class MetaDataRegionObserver implements 
RegionObserver,RegionCoprocessor
                 Long.toString(indexRebuildClientScannerTimeOutMs));
             props.setProperty(HConstants.HBASE_RPC_TIMEOUT_KEY,
                 Long.toString(indexRebuildRPCTimeoutMs));
+            props.setProperty(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+                    Long.toString(indexRebuildRPCTimeoutMs));
+            props.setProperty(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+                    Long.toString(indexRebuildRPCTimeoutMs));
             props.setProperty(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
                 Long.toString(indexRebuildRpcRetriesCounter));
             // don't run a second index populations upsert select
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 9dd384a024..fe86c59171 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -719,6 +719,10 @@ public class IndexTool extends Configured implements Tool {
                     Long.toString(indexRebuildClientScannerTimeOutMs));
             configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY,
                     Long.toString(indexRebuildRPCTimeoutMs));
+            configuration.set(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+                    Long.toString(indexRebuildRPCTimeoutMs));
+            configuration.set(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+                    Long.toString(indexRebuildRPCTimeoutMs));
             configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
                     Long.toString(indexRebuildRpcRetriesCounter));
             configuration.set("mapreduce.task.timeout", 
Long.toString(indexRebuildQueryTimeoutMs));
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
index f42d7056bc..de6d04b118 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
@@ -354,6 +354,7 @@ public class IndexUpgradeTool extends Configured implements 
Tool {
         long indexRebuildRPCTimeoutMs =
                 conf.getLong(QueryServices.INDEX_REBUILD_RPC_TIMEOUT_ATTRIB,
                         
QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_TIMEOUT);
+
         long indexRebuildClientScannerTimeOutMs =
                 
conf.getLong(QueryServices.INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT_ATTRIB,
                         
QueryServicesOptions.DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT);
@@ -364,6 +365,8 @@ public class IndexUpgradeTool extends Configured implements 
Tool {
         // Set phoenix and hbase level timeouts and rpc retries
         conf.setLong(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, 
indexRebuildQueryTimeoutMs);
         conf.setLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
indexRebuildRPCTimeoutMs);
+        conf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, 
indexRebuildRPCTimeoutMs);
+        conf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, 
indexRebuildRPCTimeoutMs);
         conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
                 indexRebuildClientScannerTimeOutMs);
         conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 
indexRebuildRpcRetriesCounter);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 901797c79e..d4bb74a611 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -1246,6 +1246,32 @@ public class ScanUtil {
         }
     }
 
+    public static Long getRPCReadTimeout(ReadOnlyProps props) {
+        if (props.get(HConstants.HBASE_RPC_READ_TIMEOUT_KEY) != null) {
+            return props.getLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+                    HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+        } else {
+            return props.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
+                    HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+        }
+    }
+
+    public static Long getPageSizeInMs(ReadOnlyProps props) {
+        if 
(props.getBoolean(QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB,
+                QueryServicesOptions.DEFAULT_PHOENIX_SERVER_PAGING_ENABLED)) {
+            long pageSizeMs = 
props.getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1);
+            if (pageSizeMs == -1) {
+                // Use the half of the HBase RPC read timeout value as the 
server page size to
+                // make sure that the HBase region server will be able to send 
a heartbeat
+                // message to the client before the client times out
+                return getRPCReadTimeout(props) / 2;
+            } else {
+                return pageSizeMs;
+            }
+        }
+        return null;
+    }
+
     public static void setScanAttributesForClient(Scan scan, PTable table,
                                                   PhoenixConnection 
phoenixConnection) throws SQLException {
         setScanAttributesForIndexReadRepair(scan, table, phoenixConnection);
@@ -1255,20 +1281,11 @@ public class ScanUtil {
         if (emptyCF != null && emptyCQ != null) {
             addEmptyColumnToScan(scan, emptyCF, emptyCQ);
         }
-        if (phoenixConnection.getQueryServices().getProps().getBoolean(
-                QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB,
-                QueryServicesOptions.DEFAULT_PHOENIX_SERVER_PAGING_ENABLED)) {
-            long pageSizeMs = phoenixConnection.getQueryServices().getProps()
-                    .getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1);
-            if (pageSizeMs == -1) {
-                // Use the half of the HBase RPC timeout value as the the 
server page size to make sure that the HBase
-                // region server will be able to send a heartbeat message to 
the client before the client times out
-                pageSizeMs = (long) 
(phoenixConnection.getQueryServices().getProps()
-                        .getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT) * 0.5);
-            }
-            scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, 
Bytes.toBytes(Long.valueOf(pageSizeMs)));
+        Long pageSizeMs = 
getPageSizeInMs(phoenixConnection.getQueryServices().getProps());
+        if (pageSizeMs != null) {
+            scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS,
+                    Bytes.toBytes(pageSizeMs));
         }
-
     }
 
     public static void getDummyResult(byte[] rowKey, List<Cell> result) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 7436092ec9..8d81164b38 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -627,12 +627,7 @@ public abstract class BaseTest {
         conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
         conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
         conf.setInt(GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 0);
-        // This results in processing one row at a time in each next operation 
of the aggregate region
-        // scanner, i.e.,  one row pages. In other words, 0ms page allows only 
one row to be processed
-        // within one page; 0ms page is equivalent to one-row page
-        if (conf.getLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 0) == 0) {
-            conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 0);
-        }
+        conf.setInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, 20); // 20ms
         return conf;
     }
 

Reply via email to