This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new f7ea38bf41 PHOENIX-7306 Metadata lookup should be permitted only
within query timeout (#1880)
f7ea38bf41 is described below
commit f7ea38bf4109c27859e6a6a043c0d4370d7f8526
Author: Viraj Jasani <[email protected]>
AuthorDate: Sat May 4 14:33:10 2024 -0800
PHOENIX-7306 Metadata lookup should be permitted only within query timeout
(#1880)
---
.../org/apache/phoenix/end2end/MapReduceIT.java | 8 ++-
.../end2end/SkipScanAfterManualSplitIT.java | 10 ++-
.../phoenix/iterate/PhoenixQueryTimeoutIT.java | 19 +++++
.../iterate/RoundRobinResultIteratorIT.java | 10 ++-
.../phoenix/schema/stats/BaseStatsCollectorIT.java | 9 +--
.../apache/phoenix/cache/ServerCacheClient.java | 16 ++++-
.../iterate/DefaultParallelScanGrouper.java | 6 +-
.../iterate/MapReduceParallelScanGrouper.java | 6 +-
.../phoenix/query/ConnectionQueryServices.java | 45 +++++++++++-
.../phoenix/query/ConnectionQueryServicesImpl.java | 82 +++++++++++++++++++---
.../query/ConnectionlessQueryServicesImpl.java | 29 +++++++-
.../query/DelegateConnectionQueryServices.java | 24 ++++++-
.../org/apache/phoenix/query/QueryServices.java | 2 +-
.../phoenix/cache/ServerCacheClientTest.java | 7 +-
.../apache/phoenix/compile/QueryCompilerTest.java | 3 +-
.../query/ConnectionQueryServicesImplTest.java | 12 +---
.../phoenix/query/ParallelIteratorsSplitTest.java | 5 +-
17 files changed, 248 insertions(+), 45 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
index 21b25bf92d..0631ddf6e6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
@@ -46,6 +46,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
@@ -210,7 +211,12 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
if (testVerySmallTimeOut) {
// run job and it should fail due to Timeout
- assertFalse("Job should fail with QueryTimeout.",
job.waitForCompletion(true));
+ try {
+ assertFalse("Job should fail with QueryTimeout.",
job.waitForCompletion(true));
+ } catch (RuntimeException e) {
+ assertTrue("Job execution failed with unexpected error.",
+ e.getCause() instanceof SQLTimeoutException);
+ }
} else {
//run
assertTrue("Job didn't complete successfully! Check logs for
reason.", job.waitForCompletion(true));
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
index 8dc558fe84..22361c7eb3 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
@@ -105,7 +106,10 @@ public class SkipScanAfterManualSplitIT extends
ParallelStatsDisabledIT {
initTable(tableName);
Connection conn = getConnection();
ConnectionQueryServices services =
conn.unwrap(PhoenixConnection.class).getQueryServices();
- int nRegions = services.getAllTableRegions(tableNameBytes).size();
+ int queryTimeout = services.getProps()
+ .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ int nRegions = services.getAllTableRegions(tableNameBytes,
queryTimeout).size();
int nInitialRegions = nRegions;
Admin admin = services.getAdmin();
try {
@@ -113,7 +117,7 @@ public class SkipScanAfterManualSplitIT extends
ParallelStatsDisabledIT {
int nTries = 0;
while (nRegions == nInitialRegions && nTries < 10) {
Thread.sleep(1000);
- nRegions = services.getAllTableRegions(tableNameBytes).size();
+ nRegions = services.getAllTableRegions(tableNameBytes,
queryTimeout).size();
nTries++;
}
// Split finished by this time, but cache isn't updated until
@@ -124,7 +128,7 @@ public class SkipScanAfterManualSplitIT extends
ParallelStatsDisabledIT {
String query = "SELECT count(*) FROM " + tableName + " WHERE a IN
('tl','jt',' a',' b',' c',' d')";
ResultSet rs1 = conn.createStatement().executeQuery(query);
assertTrue(rs1.next());
- nRegions = services.getAllTableRegions(tableNameBytes).size();
+ nRegions = services.getAllTableRegions(tableNameBytes,
queryTimeout).size();
// Region cache has been updated, as there are more regions now
assertNotEquals(nRegions, nInitialRegions);
/*
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
index fbaf6df952..1452b0e670 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
@@ -202,6 +202,25 @@ public class PhoenixQueryTimeoutIT extends
ParallelStatsDisabledIT {
}
@Test
+ public void testQueryTimeoutWithMetadataLookup() throws Exception {
+ PreparedStatement ps = loadDataAndPreparePagedQuery(0, 0);
+ try {
+ ResultSet rs = ps.executeQuery();
+ rs.next();
+ fail("Query timeout is 0ms");
+ } catch (SQLException e) {
+ Throwable t = e;
+ while (t != null && !(t instanceof SQLTimeoutException)) {
+ t = t.getCause();
+ }
+ if (t == null) {
+ fail("Expected query to fail with SQLTimeoutException");
+ }
+ assertEquals(OPERATION_TIMED_OUT.getErrorCode(),
+ ((SQLTimeoutException)t).getErrorCode());
+ }
+ }
+
public void
testScanningResultIteratorQueryTimeoutForPagingWithNormalLowTimeout() throws
Exception {
//Arrange
PreparedStatement ps = loadDataAndPreparePagedQuery(30000,30);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
index caeb45badd..68c86186e3 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.PropertiesUtil;
import org.junit.Test;
@@ -78,7 +79,10 @@ public class RoundRobinResultIteratorIT extends
ParallelStatsDisabledIT {
int numRows = setupTableForSplit(tableName);
Connection conn = getConnection();
ConnectionQueryServices services =
conn.unwrap(PhoenixConnection.class).getQueryServices();
- int nRegions = services.getAllTableRegions(tableNameBytes).size();
+ int queryTimeout = services.getProps()
+ .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ int nRegions = services.getAllTableRegions(tableNameBytes,
queryTimeout).size();
int nRegionsBeforeSplit = nRegions;
Admin admin = services.getAdmin();
try {
@@ -90,7 +94,7 @@ public class RoundRobinResultIteratorIT extends
ParallelStatsDisabledIT {
long waitTimeMillis = 2000;
while (nRegions == nRegionsBeforeSplit && nTries < 10) {
latch.await(waitTimeMillis, TimeUnit.MILLISECONDS);
- nRegions = services.getAllTableRegions(tableNameBytes).size();
+ nRegions = services.getAllTableRegions(tableNameBytes,
queryTimeout).size();
nTries++;
}
@@ -102,7 +106,7 @@ public class RoundRobinResultIteratorIT extends
ParallelStatsDisabledIT {
while (rs.next()) {
numRowsRead++;
}
- nRegions = services.getAllTableRegions(tableNameBytes).size();
+ nRegions = services.getAllTableRegions(tableNameBytes,
queryTimeout).size();
// Region cache has been updated, as there are more regions now
assertNotEquals(nRegions, nRegionsBeforeSplit);
assertEquals(numRows, numRowsRead);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java
index 4f38efddf8..3994dd2ac4 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java
@@ -59,7 +59,6 @@ import
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.BaseTest;
@@ -75,7 +74,6 @@ import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
@@ -83,7 +81,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
@@ -579,6 +576,9 @@ public abstract class BaseStatsCollectorIT extends BaseTest
{
+ "(k VARCHAR PRIMARY KEY, a.v INTEGER, b.v INTEGER,
c.v INTEGER NULL, d.v INTEGER NULL) "
+ tableDDLOptions );
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "
VALUES(?,?, ?, ?, ?)");
+ int queryTimeout =
conn.unwrap(PhoenixConnection.class).getQueryServices().getProps()
+ .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
byte[] val = new byte[250];
for (int i = 0; i < nRows; i++) {
stmt.setString(1, Character.toString((char)('a' + i)) +
Bytes.toString(val));
@@ -621,7 +621,8 @@ public abstract class BaseStatsCollectorIT extends BaseTest
{
assertEquals(physicalTableName, planAttributes.getTableName());
ConnectionQueryServices services =
conn.unwrap(PhoenixConnection.class).getQueryServices();
- List<HRegionLocation> regions =
services.getAllTableRegions(Bytes.toBytes(physicalTableName));
+ List<HRegionLocation> regions =
+ services.getAllTableRegions(Bytes.toBytes(physicalTableName),
queryTimeout);
assertEquals(1, regions.size());
collectStatistics(conn, fullTableName, Long.toString(1000));
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 600c74b0a4..eb7870dd76 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -226,7 +226,8 @@ public class ServerCacheClient {
PTable cacheUsingTable = delegate.getTableRef().getTable();
ConnectionQueryServices services =
delegate.getContext().getConnection().getQueryServices();
List<HRegionLocation> locations = services.getAllTableRegions(
- cacheUsingTable.getPhysicalName().getBytes());
+ cacheUsingTable.getPhysicalName().getBytes(),
+
delegate.getContext().getStatement().getQueryTimeoutInMillis());
int nRegions = locations.size();
Set<HRegionLocation> servers = new HashSet<>(nRegions);
cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable);
@@ -268,7 +269,12 @@ public class ServerCacheClient {
ExecutorService executor = services.getExecutor();
List<Future<Boolean>> futures = Collections.emptyList();
try {
- List<HRegionLocation> locations =
services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes());
+ int queryTimeout = connection.getQueryServices().getProps()
+ .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ List<HRegionLocation> locations =
+
services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes(),
+ queryTimeout);
int nRegions = locations.size();
// Size these based on worst case
futures = new ArrayList<Future<Boolean>>(nRegions);
@@ -380,7 +386,11 @@ public class ServerCacheClient {
byte[] tableName = cacheUsingTable.getPhysicalName().getBytes();
iterateOverTable = services.getTable(tableName);
- List<HRegionLocation> locations =
services.getAllTableRegions(tableName);
+ int queryTimeout = connection.getQueryServices().getProps()
+ .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ List<HRegionLocation> locations =
services.getAllTableRegions(tableName, queryTimeout);
+
/**
* Allow for the possibility that the region we based where to
send our cache has split and been relocated
* to another region server *after* we sent it, but before we
removed it. To accommodate this, we iterate
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
index 5252be410e..23ea797486 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
@@ -71,7 +71,8 @@ public class DefaultParallelScanGrouper implements
ParallelScanGrouper {
@Override
public List<HRegionLocation> getRegionBoundaries(StatementContext context,
byte[] tableName)
throws SQLException {
- return
context.getConnection().getQueryServices().getAllTableRegions(tableName);
+ return
context.getConnection().getQueryServices().getAllTableRegions(tableName,
+ context.getStatement().getQueryTimeoutInMillis());
}
/**
@@ -82,6 +83,7 @@ public class DefaultParallelScanGrouper implements
ParallelScanGrouper {
byte[] tableName, byte[] startRegionBoundaryKey, byte[]
stopRegionBoundaryKey)
throws SQLException {
return context.getConnection().getQueryServices()
- .getTableRegions(tableName, startRegionBoundaryKey,
stopRegionBoundaryKey);
+ .getTableRegions(tableName, startRegionBoundaryKey,
stopRegionBoundaryKey,
+ context.getStatement().getQueryTimeoutInMillis());
}
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
index 511467e693..bed1ead96d 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
@@ -68,7 +68,8 @@ public class MapReduceParallelScanGrouper implements
ParallelScanGrouper {
if ((snapshotName = getSnapshotName(conf)) != null) {
return getRegionLocationsFromSnapshot(conf,
snapshotName);
} else {
- return
context.getConnection().getQueryServices().getAllTableRegions(tableName);
+ return
context.getConnection().getQueryServices().getAllTableRegions(tableName,
+
context.getStatement().getQueryTimeoutInMillis());
}
}
@@ -84,7 +85,8 @@ public class MapReduceParallelScanGrouper implements
ParallelScanGrouper {
return getRegionLocationsFromSnapshot(conf,
snapshotName);
} else {
return context.getConnection().getQueryServices()
- .getTableRegions(tableName,
startRegionBoundaryKey, stopRegionBoundaryKey);
+ .getTableRegions(tableName,
startRegionBoundaryKey, stopRegionBoundaryKey,
+
context.getStatement().getQueryTimeoutInMillis());
}
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index cb56bd47a7..f3bb7c3c37 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -97,13 +97,37 @@ public interface ConnectionQueryServices extends
QueryServices, MetaDataMutated
public TableDescriptor getTableDescriptor(byte[] tableName) throws
SQLException;
public HRegionLocation getTableRegionLocation(byte[] tableName, byte[]
row) throws SQLException;
+
+ /**
+ * Retrieve the region metadata locations for all regions of the given
table.
+ * This method is Deprecated. Use {@link #getAllTableRegions(byte[], int)}
instead.
+ *
+ * @param tableName The table name.
+ * @return The list of table region locations.
+ * @throws SQLException If fails to retrieve region locations.
+ */
+ @Deprecated
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws
SQLException;
+ /**
+ * Retrieve the region metadata locations for all regions of the given
table.
+ * The operation to retrieve the table region locations must be completed
within
+ * the query timeout.
+ *
+ * @param tableName Table name.
+ * @param queryTimeout Phoenix query timeout.
+ * @return The list of region locations.
+ * @throws SQLException If fails to retrieve region locations.
+ */
+ public List<HRegionLocation> getAllTableRegions(byte[] tableName, int
queryTimeout)
+ throws SQLException;
+
/**
* Retrieve table region locations that cover the startRowKey and
endRowKey. The start key
* of the first region of the returned list must be less than or equal to
startRowKey.
* The end key of the last region of the returned list must be greater
than or equal to
* endRowKey.
+ * This method is Deprecated. Use {@link #getTableRegions(byte[], byte[],
byte[], int)} instead.
*
* @param tableName Table name.
* @param startRowKey Start RowKey.
@@ -111,8 +135,27 @@ public interface ConnectionQueryServices extends
QueryServices, MetaDataMutated
* @return The list of region locations that cover the startRowKey and
endRowKey key boundary.
* @throws SQLException If fails to retrieve region locations.
*/
+ @Deprecated
+ public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
+ byte[] endRowKey) throws
SQLException;
+
+ /**
+ * Retrieve table region locations that cover the startRowKey and
endRowKey. The start key
+ * of the first region of the returned list must be less than or equal to
startRowKey.
+ * The end key of the last region of the returned list must be greater
than or equal to
+ * endRowKey. The operation to retrieve the table region locations must be
completed within
+ * the query timeout.
+ *
+ * @param tableName Table name.
+ * @param startRowKey Start RowKey.
+ * @param endRowKey End RowKey.
+ * @param queryTimeout Phoenix query timeout.
+ * @return The list of region locations that cover the startRowKey and
endRowKey key boundary.
+ * @throws SQLException If fails to retrieve region locations.
+ */
public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
- byte[] endRowKey) throws SQLException;
+ byte[] endRowKey,
+ int queryTimeout) throws
SQLException;
public PhoenixConnection connect(String url, Properties info) throws
SQLException;
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index e7dad8e149..def7f84bab 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -84,6 +84,7 @@ import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
@@ -647,7 +648,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
}
public byte[] getNextRegionStartKey(HRegionLocation regionLocation, byte[]
currentKey,
- HRegionLocation prevRegionLocation) throws IOException {
+ HRegionLocation prevRegionLocation) {
// in order to check the overlap/inconsistencies bad region info, we
have to make sure
// the current endKey always increasing(compare the previous endKey)
@@ -671,9 +672,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
&& !Bytes.equals(prevRegionLocation.getRegion().getEndKey(),
HConstants.EMPTY_START_ROW)
&& !Bytes.equals(regionLocation.getRegion().getEndKey(),
HConstants.EMPTY_END_ROW);
if (conditionOne || conditionTwo) {
- String regionNameString =
- new String(regionLocation.getRegion().getRegionName(),
StandardCharsets.UTF_8);
- LOGGER.error(
+ LOGGER.warn(
"HBase region overlap/inconsistencies on {} , current key: {}
, region startKey:"
+ " {} , region endKey: {} , prev region startKey: {} ,
prev region endKey: {}",
regionLocation,
@@ -684,17 +683,29 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
"null" :
Bytes.toStringBinary(prevRegionLocation.getRegion().getStartKey()),
prevRegionLocation == null ?
"null" :
Bytes.toStringBinary(prevRegionLocation.getRegion().getEndKey()));
- throw new IOException(
- String.format("HBase region information
overlap/inconsistencies on region %s",
- regionNameString));
}
return regionLocation.getRegion().getEndKey();
}
+ /**
+ * {@inheritDoc}.
+ */
@Override
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws
SQLException {
+ int queryTimeout =
this.getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ return getTableRegions(tableName, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, queryTimeout);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getAllTableRegions(byte[] tableName, int
queryTimeout)
+ throws SQLException {
return getTableRegions(tableName, HConstants.EMPTY_START_ROW,
- HConstants.EMPTY_END_ROW);
+ HConstants.EMPTY_END_ROW, queryTimeout);
}
/**
@@ -702,7 +713,19 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
*/
@Override
public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
- byte[] endRowKey) throws SQLException {
+ byte[] endRowKey) throws
SQLException{
+ int queryTimeout =
this.getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ return getTableRegions(tableName, startRowKey, endRowKey,
queryTimeout);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getTableRegions(final byte[] tableName, final
byte[] startRowKey,
+ final byte[] endRowKey, final
int queryTimeout)
+ throws SQLException {
/*
* Use HConnection.getRegionLocation as it uses the cache in
HConnection, while getting
* all region locations from the HTable doesn't.
@@ -712,6 +735,8 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
config.getInt(PHOENIX_GET_REGIONS_RETRIES,
DEFAULT_PHOENIX_GET_REGIONS_RETRIES);
TableName table = TableName.valueOf(tableName);
byte[] currentKey = null;
+ final long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ final long maxQueryEndTime = startTime + queryTimeout;
while (true) {
try {
// We could surface the package projected
HConnectionImplementation.getNumberOfCachedRegionLocations
@@ -732,6 +757,8 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
&& Bytes.compareTo(currentKey, endRowKey) >= 0) {
break;
}
+ throwErrorIfQueryTimedOut(startRowKey, endRowKey,
maxQueryEndTime,
+ queryTimeout, table, retryCount, currentKey);
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW));
return locations;
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
@@ -756,6 +783,43 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
}
}
+ /**
+ * Throw Error if the metadata lookup takes longer than query timeout
configured.
+ *
+ * @param startRowKey Start RowKey to begin the region metadata lookup
from.
+ * @param endRowKey End RowKey to end the region metadata lookup at.
+ * @param maxQueryEndTime Max time to execute the metadata lookup.
+ * @param queryTimeout Query timeout.
+ * @param table Table Name.
+ * @param retryCount Retry Count.
+ * @param currentKey Current Key.
+ * @throws SQLException Throw Error if the metadata lookup takes longer
than query timeout.
+ */
+ private static void throwErrorIfQueryTimedOut(byte[] startRowKey, byte[]
endRowKey,
+ long maxQueryEndTime,
+ int queryTimeout, TableName
table, int retryCount,
+ byte[] currentKey) throws
SQLException {
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ if (currentTime >= maxQueryEndTime) {
+ LOGGER.error("getTableRegions has exceeded query timeout {} ms."
+ + "Table: {}, retryCount: {} , currentKey: {} , "
+ + "startRowKey: {} , endRowKey: {}",
+ queryTimeout,
+ table.getNameAsString(),
+ retryCount,
+ Bytes.toStringBinary(currentKey),
+ Bytes.toStringBinary(startRowKey),
+ Bytes.toStringBinary(endRowKey)
+ );
+ final String message = "getTableRegions has exceeded query timeout
" + queryTimeout
+ + "ms";
+ IOException e = new IOException(message);
+ throw new SQLTimeoutException(message,
+ SQLExceptionCode.OPERATION_TIMED_OUT.getSQLState(),
+ SQLExceptionCode.OPERATION_TIMED_OUT.getErrorCode(), e);
+ }
+ }
+
public PMetaData getMetaDataCache() {
return latestMetaData;
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 2a048c7319..ef26d642d6 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -219,9 +219,23 @@ public class ConnectionlessQueryServicesImpl extends
DelegateQueryServices imple
throw new UnsupportedOperationException();
}
+ /**
+ * {@inheritDoc}.
+ */
@Override
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws
SQLException {
- return getTableRegions(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW);
+ return getTableRegions(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getAllTableRegions(byte[] tableName, int
queryTimeout)
+ throws SQLException {
+ return getTableRegions(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW,
+ queryTimeout);
}
/**
@@ -229,7 +243,18 @@ public class ConnectionlessQueryServicesImpl extends
DelegateQueryServices imple
*/
@Override
public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
- byte[] endRowKey) throws SQLException {
+ byte[] endRowKey) throws
SQLException {
+ return getTableRegions(tableName, startRowKey, endRowKey,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
+ byte[] endRowKey, int
queryTimeout)
+ throws SQLException {
List<HRegionLocation> regions =
tableSplits.get(Bytes.toString(tableName));
if (regions != null) {
return regions;
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 302a1203f6..7e89dc33be 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -81,20 +81,42 @@ public class DelegateConnectionQueryServices extends
DelegateQueryServices imple
return getDelegate().getTableIfExists(tableName);
}
+ /**
+ * {@inheritDoc}.
+ */
@Override
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws
SQLException {
return getDelegate().getAllTableRegions(tableName);
}
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getAllTableRegions(byte[] tableName, int
queryTimeout)
+ throws SQLException {
+ return getDelegate().getAllTableRegions(tableName, queryTimeout);
+ }
+
/**
* {@inheritDoc}.
*/
@Override
public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
- byte[] endRowKey) throws SQLException {
+ byte[] endRowKey) throws
SQLException {
return getDelegate().getTableRegions(tableName, startRowKey,
endRowKey);
}
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
+ byte[] endRowKey, int
queryTimeout)
+ throws SQLException {
+ return getDelegate().getTableRegions(tableName, startRowKey,
endRowKey, queryTimeout);
+ }
+
@Override
public void addTable(PTable table, long resolvedTime) throws SQLException {
getDelegate().addTable(table, resolvedTime);
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index c01edfbbc3..8842ac3ce5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -435,7 +435,7 @@ public interface QueryServices extends SQLCloseable {
*/
String PHOENIX_GET_REGIONS_RETRIES = "phoenix.get.table.regions.retries";
- int DEFAULT_PHOENIX_GET_REGIONS_RETRIES = 3;
+ int DEFAULT_PHOENIX_GET_REGIONS_RETRIES = 10;
/**
* Get executor service used for parallel scans
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerCacheClientTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerCacheClientTest.java
index 496fb1cc42..875819540b 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerCacheClientTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerCacheClientTest.java
@@ -18,11 +18,14 @@ package org.apache.phoenix.cache;
import static org.junit.Assert.assertEquals;
import java.sql.SQLException;
+import java.util.HashMap;
+
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.Test;
import org.mockito.Mockito;
@@ -32,11 +35,13 @@ public class ServerCacheClientTest {
PhoenixConnection connection = Mockito.mock(PhoenixConnection.class);
ConnectionQueryServices services =
Mockito.mock(ConnectionQueryServices.class);
Mockito.when(services.getExecutor()).thenReturn(null);
+ Mockito.when(services.getProps()).thenReturn(new ReadOnlyProps(new
HashMap<>()));
Mockito.when(connection.getQueryServices()).thenReturn(services);
byte[] tableName = Bytes.toBytes("TableName");
PTableImpl pTable = Mockito.mock(PTableImpl.class);
Mockito.when(pTable.getPhysicalName()).thenReturn(PNameFactory.newName("TableName"));
- Mockito.when(services.getAllTableRegions(tableName)).thenThrow(new
SQLException("Test Exception"));
+ Mockito.when(services.getAllTableRegions(tableName,
600000)).thenThrow(new SQLException(
+ "Test Exception"));
ServerCacheClient client = new ServerCacheClient(connection);
try {
client.addServerCache(null, null, null, null, pTable, false);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 425073357f..dc27069c66 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -2562,7 +2562,8 @@ public class QueryCompilerTest extends
BaseConnectionlessQueryTest {
ScanRanges ranges=plan.getContext().getScanRanges();
List<HRegionLocation> regionLocations=
-
conn.getQueryServices().getAllTableRegions(Bytes.toBytes("SALT_TEST2900"));
+
conn.getQueryServices().getAllTableRegions(Bytes.toBytes("SALT_TEST2900"),
+ 60000);
for (HRegionLocation regionLocation : regionLocations) {
assertTrue(ranges.intersectRegion(regionLocation.getRegion().getStartKey(),
regionLocation.getRegion().getEndKey(), false));
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
index 078fc8d8e2..4e6e615c90 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
@@ -60,6 +60,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
@@ -245,16 +246,7 @@ public class ConnectionQueryServicesImplTest {
private void testGetNextRegionStartKey(ConnectionQueryServicesImpl
mockCqsi,
HRegionLocation mockRegionLocation, byte[] key, boolean isCorrupted,
HRegionLocation mockPrevRegionLocation) {
- try {
- mockCqsi.getNextRegionStartKey(mockRegionLocation, key,
mockPrevRegionLocation);
- if (isCorrupted) {
- fail();
- }
- } catch (IOException e) {
- if (!isCorrupted) {
- fail();
- }
- }
+ mockCqsi.getNextRegionStartKey(mockRegionLocation, key,
mockPrevRegionLocation);
}
@Test
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index d10c96f174..7890b0795d 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -112,7 +112,10 @@ public class ParallelIteratorsSplitTest extends
BaseConnectionlessQueryTest {
PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(),
TABLE_NAME));
TableRef tableRef = new TableRef(table);
- List<HRegionLocation> regions =
pconn.getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes());
+ List<HRegionLocation> regions =
+ pconn.getQueryServices()
+
.getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes(),
+ 60000);
List<KeyRange> ranges = getSplits(tableRef, scan, regions, scanRanges);
assertEquals("Unexpected number of splits: " + ranges.size(),
expectedSplits.size(), ranges.size());
for (int i=0; i<expectedSplits.size(); i++) {