This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new c618921f3a PHOENIX-7306 Metadata lookup should be permitted only
within query timeout (#1880)
c618921f3a is described below
commit c618921f3a67bb98b17169631eee564e3e6afc80
Author: Viraj Jasani <[email protected]>
AuthorDate: Sat May 4 14:33:10 2024 -0800
PHOENIX-7306 Metadata lookup should be permitted only within query timeout
(#1880)
---
.../apache/phoenix/cache/ServerCacheClient.java | 16 ++++-
.../iterate/DefaultParallelScanGrouper.java | 6 +-
.../phoenix/query/ConnectionQueryServices.java | 45 +++++++++++-
.../phoenix/query/ConnectionQueryServicesImpl.java | 82 +++++++++++++++++++---
.../query/ConnectionlessQueryServicesImpl.java | 29 +++++++-
.../query/DelegateConnectionQueryServices.java | 24 ++++++-
.../org/apache/phoenix/query/QueryServices.java | 2 +-
.../iterate/MapReduceParallelScanGrouper.java | 6 +-
.../org/apache/phoenix/end2end/MapReduceIT.java | 8 ++-
.../end2end/SkipScanAfterManualSplitIT.java | 10 ++-
.../phoenix/iterate/PhoenixQueryTimeoutIT.java | 21 +++++-
.../iterate/RoundRobinResultIteratorIT.java | 10 ++-
.../monitoring/PhoenixTableLevelMetricsIT.java | 46 +++++++++---
.../phoenix/schema/stats/BaseStatsCollectorIT.java | 9 +--
.../phoenix/cache/ServerCacheClientTest.java | 7 +-
.../apache/phoenix/compile/QueryCompilerTest.java | 4 +-
.../query/ConnectionQueryServicesImplTest.java | 15 ++--
.../phoenix/query/ParallelIteratorsSplitTest.java | 5 +-
18 files changed, 289 insertions(+), 56 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 6700ce7c50..9c6bb11695 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -225,7 +225,8 @@ public class ServerCacheClient {
PTable cacheUsingTable = delegate.getTableRef().getTable();
ConnectionQueryServices services =
delegate.getContext().getConnection().getQueryServices();
List<HRegionLocation> locations = services.getAllTableRegions(
- cacheUsingTable.getPhysicalName().getBytes());
+ cacheUsingTable.getPhysicalName().getBytes(),
+
delegate.getContext().getStatement().getQueryTimeoutInMillis());
int nRegions = locations.size();
Set<HRegionLocation> servers = new HashSet<>(nRegions);
cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable);
@@ -267,7 +268,12 @@ public class ServerCacheClient {
ExecutorService executor = services.getExecutor();
List<Future<Boolean>> futures = Collections.emptyList();
try {
- List<HRegionLocation> locations =
services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes());
+ int queryTimeout = connection.getQueryServices().getProps()
+ .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ List<HRegionLocation> locations =
+
services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes(),
+ queryTimeout);
int nRegions = locations.size();
// Size these based on worst case
futures = new ArrayList<Future<Boolean>>(nRegions);
@@ -379,7 +385,11 @@ public class ServerCacheClient {
byte[] tableName = cacheUsingTable.getPhysicalName().getBytes();
iterateOverTable = services.getTable(tableName);
- List<HRegionLocation> locations =
services.getAllTableRegions(tableName);
+ int queryTimeout = connection.getQueryServices().getProps()
+ .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ List<HRegionLocation> locations =
services.getAllTableRegions(tableName, queryTimeout);
+
/**
* Allow for the possibility that the region we based where to
send our cache has split and been relocated
* to another region server *after* we sent it, but before we
removed it. To accommodate this, we iterate
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
index 5252be410e..23ea797486 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
@@ -71,7 +71,8 @@ public class DefaultParallelScanGrouper implements
ParallelScanGrouper {
@Override
public List<HRegionLocation> getRegionBoundaries(StatementContext context,
byte[] tableName)
throws SQLException {
- return
context.getConnection().getQueryServices().getAllTableRegions(tableName);
+ return
context.getConnection().getQueryServices().getAllTableRegions(tableName,
+ context.getStatement().getQueryTimeoutInMillis());
}
/**
@@ -82,6 +83,7 @@ public class DefaultParallelScanGrouper implements
ParallelScanGrouper {
byte[] tableName, byte[] startRegionBoundaryKey, byte[]
stopRegionBoundaryKey)
throws SQLException {
return context.getConnection().getQueryServices()
- .getTableRegions(tableName, startRegionBoundaryKey,
stopRegionBoundaryKey);
+ .getTableRegions(tableName, startRegionBoundaryKey,
stopRegionBoundaryKey,
+ context.getStatement().getQueryTimeoutInMillis());
}
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index ffb5859431..95ea0bab42 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -99,13 +99,37 @@ public interface ConnectionQueryServices extends
QueryServices, MetaDataMutated
public TableDescriptor getTableDescriptor(byte[] tableName) throws
SQLException;
public HRegionLocation getTableRegionLocation(byte[] tableName, byte[]
row) throws SQLException;
+
+ /**
+ * Retrieve the region metadata locations for all regions of the given
table.
+ * This method is Deprecated. Use {@link #getAllTableRegions(byte[], int)}
instead.
+ *
+ * @param tableName The table name.
+ * @return The list of table region locations.
+ * @throws SQLException If fails to retrieve region locations.
+ */
+ @Deprecated
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws
SQLException;
+ /**
+ * Retrieve the region metadata locations for all regions of the given
table.
+ * The operation to retrieve the table region locations must be completed
within
+ * the query timeout.
+ *
+ * @param tableName Table name.
+ * @param queryTimeout Phoenix query timeout.
+ * @return The list of region locations.
+ * @throws SQLException If fails to retrieve region locations.
+ */
+ public List<HRegionLocation> getAllTableRegions(byte[] tableName, int
queryTimeout)
+ throws SQLException;
+
/**
* Retrieve table region locations that cover the startRowKey and
endRowKey. The start key
* of the first region of the returned list must be less than or equal to
startRowKey.
* The end key of the last region of the returned list must be greater
than or equal to
* endRowKey.
+ * This method is Deprecated. Use {@link #getTableRegions(byte[], byte[],
byte[], int)} instead.
*
* @param tableName Table name.
* @param startRowKey Start RowKey.
@@ -113,8 +137,27 @@ public interface ConnectionQueryServices extends
QueryServices, MetaDataMutated
* @return The list of region locations that cover the startRowKey and
endRowKey key boundary.
* @throws SQLException If fails to retrieve region locations.
*/
+ @Deprecated
+ public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
+ byte[] endRowKey) throws
SQLException;
+
+ /**
+ * Retrieve table region locations that cover the startRowKey and
endRowKey. The start key
+ * of the first region of the returned list must be less than or equal to
startRowKey.
+ * The end key of the last region of the returned list must be greater
than or equal to
+ * endRowKey. The operation to retrieve the table region locations must be
completed within
+ * the query timeout.
+ *
+ * @param tableName Table name.
+ * @param startRowKey Start RowKey.
+ * @param endRowKey End RowKey.
+ * @param queryTimeout Phoenix query timeout.
+ * @return The list of region locations that cover the startRowKey and
endRowKey key boundary.
+ * @throws SQLException If fails to retrieve region locations.
+ */
public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
- byte[] endRowKey) throws SQLException;
+ byte[] endRowKey,
+ int queryTimeout) throws
SQLException;
public PhoenixConnection connect(String url, Properties info) throws
SQLException;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index df4aaf5d05..b05d352bc8 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -93,6 +93,7 @@ import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
@@ -708,7 +709,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
}
public byte[] getNextRegionStartKey(HRegionLocation regionLocation, byte[]
currentKey,
- HRegionLocation prevRegionLocation) throws IOException {
+ HRegionLocation prevRegionLocation) {
// in order to check the overlap/inconsistencies bad region info, we
have to make sure
// the current endKey always increasing(compare the previous endKey)
@@ -733,9 +734,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
&& !Bytes.equals(regionLocation.getRegion().getEndKey(),
HConstants.EMPTY_END_ROW);
if (conditionOne || conditionTwo) {
GLOBAL_HBASE_COUNTER_METADATA_INCONSISTENCY.increment();
- String regionNameString =
- new String(regionLocation.getRegion().getRegionName(),
StandardCharsets.UTF_8);
- LOGGER.error(
+ LOGGER.warn(
"HBase region overlap/inconsistencies on {} , current key: {}
, region startKey:"
+ " {} , region endKey: {} , prev region startKey: {} ,
prev region endKey: {}",
regionLocation,
@@ -746,17 +745,29 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
"null" :
Bytes.toStringBinary(prevRegionLocation.getRegion().getStartKey()),
prevRegionLocation == null ?
"null" :
Bytes.toStringBinary(prevRegionLocation.getRegion().getEndKey()));
- throw new IOException(
- String.format("HBase region information
overlap/inconsistencies on region %s",
- regionNameString));
}
return regionLocation.getRegion().getEndKey();
}
+ /**
+ * {@inheritDoc}.
+ */
@Override
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws
SQLException {
+ int queryTimeout =
this.getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ return getTableRegions(tableName, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, queryTimeout);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getAllTableRegions(byte[] tableName, int
queryTimeout)
+ throws SQLException {
return getTableRegions(tableName, HConstants.EMPTY_START_ROW,
- HConstants.EMPTY_END_ROW);
+ HConstants.EMPTY_END_ROW, queryTimeout);
}
/**
@@ -764,7 +775,19 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
*/
@Override
public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
- byte[] endRowKey) throws SQLException {
+ byte[] endRowKey) throws
SQLException{
+ int queryTimeout =
this.getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ return getTableRegions(tableName, startRowKey, endRowKey,
queryTimeout);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getTableRegions(final byte[] tableName, final
byte[] startRowKey,
+ final byte[] endRowKey, final
int queryTimeout)
+ throws SQLException {
/*
* Use HConnection.getRegionLocation as it uses the cache in
HConnection, while getting
* all region locations from the HTable doesn't.
@@ -774,6 +797,8 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
config.getInt(PHOENIX_GET_REGIONS_RETRIES,
DEFAULT_PHOENIX_GET_REGIONS_RETRIES);
TableName table = TableName.valueOf(tableName);
byte[] currentKey = null;
+ final long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ final long maxQueryEndTime = startTime + queryTimeout;
while (true) {
try {
// We could surface the package projected
HConnectionImplementation.getNumberOfCachedRegionLocations
@@ -794,6 +819,8 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
&& Bytes.compareTo(currentKey, endRowKey) >= 0) {
break;
}
+ throwErrorIfQueryTimedOut(startRowKey, endRowKey,
maxQueryEndTime,
+ queryTimeout, table, retryCount, currentKey);
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW));
return locations;
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
@@ -818,6 +845,43 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
}
}
+ /**
+ * Throw Error if the metadata lookup takes longer than query timeout
configured.
+ *
+ * @param startRowKey Start RowKey to begin the region metadata lookup
from.
+ * @param endRowKey End RowKey to end the region metadata lookup at.
+ * @param maxQueryEndTime Max time to execute the metadata lookup.
+ * @param queryTimeout Query timeout.
+ * @param table Table Name.
+ * @param retryCount Retry Count.
+ * @param currentKey Current Key.
+ * @throws SQLException Throw Error if the metadata lookup takes longer
than query timeout.
+ */
+ private static void throwErrorIfQueryTimedOut(byte[] startRowKey, byte[]
endRowKey,
+ long maxQueryEndTime,
+ int queryTimeout, TableName
table, int retryCount,
+ byte[] currentKey) throws
SQLException {
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ if (currentTime >= maxQueryEndTime) {
+ LOGGER.error("getTableRegions has exceeded query timeout {} ms."
+ + "Table: {}, retryCount: {} , currentKey: {} , "
+ + "startRowKey: {} , endRowKey: {}",
+ queryTimeout,
+ table.getNameAsString(),
+ retryCount,
+ Bytes.toStringBinary(currentKey),
+ Bytes.toStringBinary(startRowKey),
+ Bytes.toStringBinary(endRowKey)
+ );
+ final String message = "getTableRegions has exceeded query timeout
" + queryTimeout
+ + "ms";
+ IOException e = new IOException(message);
+ throw new SQLTimeoutException(message,
+ SQLExceptionCode.OPERATION_TIMED_OUT.getSQLState(),
+ SQLExceptionCode.OPERATION_TIMED_OUT.getErrorCode(), e);
+ }
+ }
+
public PMetaData getMetaDataCache() {
return latestMetaData;
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 470a44fb76..f0e5277090 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -223,9 +223,23 @@ public class ConnectionlessQueryServicesImpl extends
DelegateQueryServices imple
throw new UnsupportedOperationException();
}
+ /**
+ * {@inheritDoc}.
+ */
@Override
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws
SQLException {
- return getTableRegions(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW);
+ return getTableRegions(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getAllTableRegions(byte[] tableName, int
queryTimeout)
+ throws SQLException {
+ return getTableRegions(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW,
+ queryTimeout);
}
/**
@@ -233,7 +247,18 @@ public class ConnectionlessQueryServicesImpl extends
DelegateQueryServices imple
*/
@Override
public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
- byte[] endRowKey) throws SQLException {
+ byte[] endRowKey) throws
SQLException {
+ return getTableRegions(tableName, startRowKey, endRowKey,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
+ byte[] endRowKey, int
queryTimeout)
+ throws SQLException {
List<HRegionLocation> regions =
tableSplits.get(Bytes.toString(tableName));
if (regions != null) {
return regions;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 3e3f9f1ab3..9945896dd7 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -82,20 +82,42 @@ public class DelegateConnectionQueryServices extends
DelegateQueryServices imple
return getDelegate().getTableIfExists(tableName);
}
+ /**
+ * {@inheritDoc}.
+ */
@Override
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws
SQLException {
return getDelegate().getAllTableRegions(tableName);
}
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getAllTableRegions(byte[] tableName, int
queryTimeout)
+ throws SQLException {
+ return getDelegate().getAllTableRegions(tableName, queryTimeout);
+ }
+
/**
* {@inheritDoc}.
*/
@Override
public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
- byte[] endRowKey) throws SQLException {
+ byte[] endRowKey) throws
SQLException {
return getDelegate().getTableRegions(tableName, startRowKey,
endRowKey);
}
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
+ byte[] endRowKey, int
queryTimeout)
+ throws SQLException {
+ return getDelegate().getTableRegions(tableName, startRowKey,
endRowKey, queryTimeout);
+ }
+
@Override
public void addTable(PTable table, long resolvedTime) throws SQLException {
getDelegate().addTable(table, resolvedTime);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 67187ec09b..0279377d5a 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -484,7 +484,7 @@ public interface QueryServices extends SQLCloseable {
*/
String PHOENIX_GET_REGIONS_RETRIES = "phoenix.get.table.regions.retries";
- int DEFAULT_PHOENIX_GET_REGIONS_RETRIES = 3;
+ int DEFAULT_PHOENIX_GET_REGIONS_RETRIES = 10;
/**
* Get executor service used for parallel scans
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
index 511467e693..bed1ead96d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
@@ -68,7 +68,8 @@ public class MapReduceParallelScanGrouper implements
ParallelScanGrouper {
if ((snapshotName = getSnapshotName(conf)) != null) {
return getRegionLocationsFromSnapshot(conf,
snapshotName);
} else {
- return
context.getConnection().getQueryServices().getAllTableRegions(tableName);
+ return
context.getConnection().getQueryServices().getAllTableRegions(tableName,
+
context.getStatement().getQueryTimeoutInMillis());
}
}
@@ -84,7 +85,8 @@ public class MapReduceParallelScanGrouper implements
ParallelScanGrouper {
return getRegionLocationsFromSnapshot(conf,
snapshotName);
} else {
return context.getConnection().getQueryServices()
- .getTableRegions(tableName,
startRegionBoundaryKey, stopRegionBoundaryKey);
+ .getTableRegions(tableName,
startRegionBoundaryKey, stopRegionBoundaryKey,
+
context.getStatement().getQueryTimeoutInMillis());
}
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
index 21b25bf92d..0631ddf6e6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
@@ -46,6 +46,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
@@ -210,7 +211,12 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
if (testVerySmallTimeOut) {
// run job and it should fail due to Timeout
- assertFalse("Job should fail with QueryTimeout.",
job.waitForCompletion(true));
+ try {
+ assertFalse("Job should fail with QueryTimeout.",
job.waitForCompletion(true));
+ } catch (RuntimeException e) {
+ assertTrue("Job execution failed with unexpected error.",
+ e.getCause() instanceof SQLTimeoutException);
+ }
} else {
//run
assertTrue("Job didn't complete successfully! Check logs for
reason.", job.waitForCompletion(true));
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
index 8dc558fe84..22361c7eb3 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
@@ -105,7 +106,10 @@ public class SkipScanAfterManualSplitIT extends
ParallelStatsDisabledIT {
initTable(tableName);
Connection conn = getConnection();
ConnectionQueryServices services =
conn.unwrap(PhoenixConnection.class).getQueryServices();
- int nRegions = services.getAllTableRegions(tableNameBytes).size();
+ int queryTimeout = services.getProps()
+ .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ int nRegions = services.getAllTableRegions(tableNameBytes,
queryTimeout).size();
int nInitialRegions = nRegions;
Admin admin = services.getAdmin();
try {
@@ -113,7 +117,7 @@ public class SkipScanAfterManualSplitIT extends
ParallelStatsDisabledIT {
int nTries = 0;
while (nRegions == nInitialRegions && nTries < 10) {
Thread.sleep(1000);
- nRegions = services.getAllTableRegions(tableNameBytes).size();
+ nRegions = services.getAllTableRegions(tableNameBytes,
queryTimeout).size();
nTries++;
}
// Split finished by this time, but cache isn't updated until
@@ -124,7 +128,7 @@ public class SkipScanAfterManualSplitIT extends
ParallelStatsDisabledIT {
String query = "SELECT count(*) FROM " + tableName + " WHERE a IN
('tl','jt',' a',' b',' c',' d')";
ResultSet rs1 = conn.createStatement().executeQuery(query);
assertTrue(rs1.next());
- nRegions = services.getAllTableRegions(tableNameBytes).size();
+ nRegions = services.getAllTableRegions(tableNameBytes,
queryTimeout).size();
// Region cache has been updated, as there are more regions now
assertNotEquals(nRegions, nInitialRegions);
/*
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
index 8157135e40..cdc1ca6900 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
@@ -253,8 +252,28 @@ public class PhoenixQueryTimeoutIT extends
ParallelStatsDisabledIT {
((SQLTimeoutException)t).getErrorCode());
} finally {
BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(false);
+ EnvironmentEdgeManager.reset();
}
+ }
+ @Test
+ public void testQueryTimeoutWithMetadataLookup() throws Exception {
+ PreparedStatement ps = loadDataAndPreparePagedQuery(0, 0);
+ try {
+ ResultSet rs = ps.executeQuery();
+ rs.next();
+ fail("Query timeout is 0ms");
+ } catch (SQLException e) {
+ Throwable t = e;
+ while (t != null && !(t instanceof SQLTimeoutException)) {
+ t = t.getCause();
+ }
+ if (t == null) {
+ fail("Expected query to fail with SQLTimeoutException");
+ }
+ assertEquals(OPERATION_TIMED_OUT.getErrorCode(),
+ ((SQLTimeoutException)t).getErrorCode());
+ }
}
@Test
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
index caeb45badd..68c86186e3 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.PropertiesUtil;
import org.junit.Test;
@@ -78,7 +79,10 @@ public class RoundRobinResultIteratorIT extends
ParallelStatsDisabledIT {
int numRows = setupTableForSplit(tableName);
Connection conn = getConnection();
ConnectionQueryServices services =
conn.unwrap(PhoenixConnection.class).getQueryServices();
- int nRegions = services.getAllTableRegions(tableNameBytes).size();
+ int queryTimeout = services.getProps()
+ .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+ int nRegions = services.getAllTableRegions(tableNameBytes,
queryTimeout).size();
int nRegionsBeforeSplit = nRegions;
Admin admin = services.getAdmin();
try {
@@ -90,7 +94,7 @@ public class RoundRobinResultIteratorIT extends
ParallelStatsDisabledIT {
long waitTimeMillis = 2000;
while (nRegions == nRegionsBeforeSplit && nTries < 10) {
latch.await(waitTimeMillis, TimeUnit.MILLISECONDS);
- nRegions = services.getAllTableRegions(tableNameBytes).size();
+ nRegions = services.getAllTableRegions(tableNameBytes,
queryTimeout).size();
nTries++;
}
@@ -102,7 +106,7 @@ public class RoundRobinResultIteratorIT extends
ParallelStatsDisabledIT {
while (rs.next()) {
numRowsRead++;
}
- nRegions = services.getAllTableRegions(tableNameBytes).size();
+ nRegions = services.getAllTableRegions(tableNameBytes,
queryTimeout).size();
// Region cache has been updated, as there are more regions now
assertNotEquals(nRegions, nRegionsBeforeSplit);
assertEquals(numRows, numRowsRead);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
index 4f15622d1d..ab00c2ace4 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
@@ -550,7 +550,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
/**
* After PHOENIX-6767 point lookup queries don't require to get table
regions using
- * {@link ConnectionQueryServices#getAllTableRegions(byte[])} to prepare
scans
+ * {@link ConnectionQueryServices#getAllTableRegions(byte[], int)} to
prepare scans
* so custom driver defined here inject failures or delays don't have
effect.
* Hence skipping the test.
*/
@@ -574,7 +574,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
/**
* After PHOENIX-6767 point lookup queries don't require to get table
regions using
- * {@link ConnectionQueryServices#getAllTableRegions(byte[])} to prepare
scans
+ * {@link ConnectionQueryServices#getAllTableRegions(byte[], int)} to
prepare scans
* so custom driver {@link PhoenixMetricsTestingDriver} defined here
inject failures or delays
* don't have effect. Hence skipping the test.
*/
@@ -1089,7 +1089,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
/**
* After PHOENIX-6767 point lookup queries don't require to get table
regions using
- * {@link ConnectionQueryServices#getAllTableRegions(byte[])} to prepare
scans
+ * {@link ConnectionQueryServices#getAllTableRegions(byte[], int)} to
prepare scans
* so custom driver defined here inject failures or delays don't have
effect.
* Hence skipping the test.
*/
@@ -1133,7 +1133,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
/**
* After PHOENIX-6767 point lookup queries don't require to get table
regions using
- * {@link ConnectionQueryServices#getAllTableRegions(byte[])} to prepare
scans
+ * {@link ConnectionQueryServices#getAllTableRegions(byte[], int)} to
prepare scans
* so custom driver defined here inject failures or delays don't have
effect.
* Hence skipping the test.
*/
@@ -1549,8 +1549,8 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
super(services, connectionInfo, info);
}
- // Make plan.iterator() fail (ultimately calls
CQSI.getAllTableRegions())
- @Override public List<HRegionLocation> getAllTableRegions(byte[]
tableName)
+ @Override
+ public List<HRegionLocation> getAllTableRegions(byte[] tableName)
throws SQLException {
if (failExecuteQueryAndClientSideDeletes) {
throw new
SQLExceptionInfo.Builder(GET_TABLE_REGIONS_FAIL).build().buildException();
@@ -1563,12 +1563,27 @@ public class PhoenixTableLevelMetricsIT extends
BaseTest {
return super.getAllTableRegions(tableName);
}
+ // Make plan.iterator() fail (ultimately calls
CQSI.getAllTableRegions())
+ @Override public List<HRegionLocation> getAllTableRegions(byte[]
tableName,
+ int
queryTimeout)
+ throws SQLException {
+ if (failExecuteQueryAndClientSideDeletes) {
+ throw new
SQLExceptionInfo.Builder(GET_TABLE_REGIONS_FAIL).build().buildException();
+ }
+ try {
+ Thread.sleep(injectDelay);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return super.getAllTableRegions(tableName, queryTimeout);
+ }
+
@Override
public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
- byte[] endRowKey) throws SQLException {
+ byte[] endRowKey) throws
SQLException {
if (failExecuteQueryAndClientSideDeletes) {
throw new SQLExceptionInfo.Builder(GET_TABLE_REGIONS_FAIL)
- .build().buildException();
+ .build().buildException();
}
try {
Thread.sleep(injectDelay);
@@ -1577,6 +1592,21 @@ public class PhoenixTableLevelMetricsIT extends BaseTest
{
}
return super.getTableRegions(tableName, startRowKey, endRowKey);
}
+
+ @Override
+ public List<HRegionLocation> getTableRegions(byte[] tableName, byte[]
startRowKey,
+ byte[] endRowKey, int
queryTimeout) throws SQLException {
+ if (failExecuteQueryAndClientSideDeletes) {
+ throw new SQLExceptionInfo.Builder(GET_TABLE_REGIONS_FAIL)
+ .build().buildException();
+ }
+ try {
+ Thread.sleep(injectDelay);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return super.getTableRegions(tableName, startRowKey, endRowKey,
queryTimeout);
+ }
}
/**
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java
index ca07528979..7e64ef69d5 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java
@@ -60,7 +60,6 @@ import
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.BaseTest;
@@ -76,7 +75,6 @@ import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
@@ -84,7 +82,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
@@ -581,6 +578,9 @@ public abstract class BaseStatsCollectorIT extends BaseTest
{
+ "(k VARCHAR PRIMARY KEY, a.v INTEGER, b.v INTEGER,
c.v INTEGER NULL, d.v INTEGER NULL) "
+ tableDDLOptions );
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "
VALUES(?,?, ?, ?, ?)");
+ int queryTimeout =
conn.unwrap(PhoenixConnection.class).getQueryServices().getProps()
+ .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
byte[] val = new byte[250];
for (int i = 0; i < nRows; i++) {
stmt.setString(1, Character.toString((char)('a' + i)) +
Bytes.toString(val));
@@ -623,7 +623,8 @@ public abstract class BaseStatsCollectorIT extends BaseTest
{
assertEquals(physicalTableName, planAttributes.getTableName());
ConnectionQueryServices services =
conn.unwrap(PhoenixConnection.class).getQueryServices();
- List<HRegionLocation> regions =
services.getAllTableRegions(Bytes.toBytes(physicalTableName));
+ List<HRegionLocation> regions =
+ services.getAllTableRegions(Bytes.toBytes(physicalTableName),
queryTimeout);
assertEquals(1, regions.size());
collectStatistics(conn, fullTableName, Long.toString(1000));
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerCacheClientTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerCacheClientTest.java
index 496fb1cc42..875819540b 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerCacheClientTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerCacheClientTest.java
@@ -18,11 +18,14 @@ package org.apache.phoenix.cache;
import static org.junit.Assert.assertEquals;
import java.sql.SQLException;
+import java.util.HashMap;
+
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.Test;
import org.mockito.Mockito;
@@ -32,11 +35,13 @@ public class ServerCacheClientTest {
PhoenixConnection connection = Mockito.mock(PhoenixConnection.class);
ConnectionQueryServices services =
Mockito.mock(ConnectionQueryServices.class);
Mockito.when(services.getExecutor()).thenReturn(null);
+ Mockito.when(services.getProps()).thenReturn(new ReadOnlyProps(new
HashMap<>()));
Mockito.when(connection.getQueryServices()).thenReturn(services);
byte[] tableName = Bytes.toBytes("TableName");
PTableImpl pTable = Mockito.mock(PTableImpl.class);
Mockito.when(pTable.getPhysicalName()).thenReturn(PNameFactory.newName("TableName"));
- Mockito.when(services.getAllTableRegions(tableName)).thenThrow(new
SQLException("Test Exception"));
+ Mockito.when(services.getAllTableRegions(tableName,
600000)).thenThrow(new SQLException(
+ "Test Exception"));
ServerCacheClient client = new ServerCacheClient(connection);
try {
client.addServerCache(null, null, null, null, pTable, false);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 17391345d4..9d3f962521 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -78,7 +78,6 @@ import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.CountAggregator;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
import org.apache.phoenix.expression.function.TimeUnit;
-import org.apache.phoenix.filter.EmptyColumnOnlyFilter;
import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
@@ -2563,7 +2562,8 @@ public class QueryCompilerTest extends
BaseConnectionlessQueryTest {
ScanRanges ranges=plan.getContext().getScanRanges();
List<HRegionLocation> regionLocations=
-
conn.getQueryServices().getAllTableRegions(Bytes.toBytes("SALT_TEST2900"));
+
conn.getQueryServices().getAllTableRegions(Bytes.toBytes("SALT_TEST2900"),
+ 60000);
for (HRegionLocation regionLocation : regionLocations) {
assertTrue(ranges.intersectRegion(regionLocation.getRegion().getStartKey(),
regionLocation.getRegion().getEndKey(), false));
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
index 9052e79497..d0d19c23eb 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
@@ -61,6 +61,7 @@ import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
@@ -252,19 +253,11 @@ public class ConnectionQueryServicesImplTest {
private void testGetNextRegionStartKey(ConnectionQueryServicesImpl
mockCqsi,
HRegionLocation mockRegionLocation, byte[] key, boolean isCorrupted,
HRegionLocation mockPrevRegionLocation) {
- try {
- mockCqsi.getNextRegionStartKey(mockRegionLocation, key,
mockPrevRegionLocation);
- if (isCorrupted) {
- fail();
- }
- } catch (IOException e) {
- if (!isCorrupted) {
- fail();
- }
- }
+ mockCqsi.getNextRegionStartKey(mockRegionLocation, key,
mockPrevRegionLocation);
assertEquals(isCorrupted ? 1 : 0,
-
GlobalClientMetrics.GLOBAL_HBASE_COUNTER_METADATA_INCONSISTENCY.getMetric().getValue());
+
GlobalClientMetrics.GLOBAL_HBASE_COUNTER_METADATA_INCONSISTENCY.getMetric()
+ .getValue());
}
@Test
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 4454ea5761..8433906f37 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -112,7 +112,10 @@ public class ParallelIteratorsSplitTest extends
BaseConnectionlessQueryTest {
PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(),
TABLE_NAME));
TableRef tableRef = new TableRef(table);
- List<HRegionLocation> regions =
pconn.getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes());
+ List<HRegionLocation> regions =
+ pconn.getQueryServices()
+
.getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes(),
+ 60000);
List<KeyRange> ranges = getSplits(tableRef, scan, regions, scanRanges);
assertEquals("Unexpected number of splits: " + ranges.size(),
expectedSplits.size(), ranges.size());
for (int i=0; i<expectedSplits.size(); i++) {