This is an automated email from the ASF dual-hosted git repository. gjacoby pushed a commit to branch 5.1 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push: new 6bcd8011a6 PHOENIX-6918 :- ScanningResultIterator should not retry when the query times out (#1589) 6bcd8011a6 is described below commit 6bcd8011a6152a0b5b53c04a1e2ef380616ccc39 Author: Lokesh Khurana <khuranalokes...@gmail.com> AuthorDate: Mon Apr 17 10:06:51 2023 -0700 PHOENIX-6918 :- ScanningResultIterator should not retry when the query times out (#1589) Co-authored-by: Lokesh Khurana <lokesh.khur...@salesforce.com> --- .../org/apache/phoenix/end2end/MapReduceIT.java | 106 ++++++++++++++++++++- .../iterate/DelayedTableResultIteratorFactory.java | 9 +- .../phoenix/iterate/PhoenixQueryTimeoutIT.java | 65 ++++++++++++- .../phoenix/iterate/BaseResultIterators.java | 19 +++- .../phoenix/iterate/ChunkedResultIterator.java | 3 +- .../iterate/DefaultTableResultIteratorFactory.java | 5 +- .../apache/phoenix/iterate/ParallelIterators.java | 5 +- .../phoenix/iterate/ScanningResultIterator.java | 12 ++- .../apache/phoenix/iterate/SerialIterators.java | 10 +- .../phoenix/iterate/TableResultIterator.java | 19 ++-- .../iterate/TableResultIteratorFactory.java | 3 +- .../iterate/TableSnapshotResultIterator.java | 6 +- .../phoenix/mapreduce/PhoenixRecordReader.java | 8 +- 13 files changed, 233 insertions(+), 37 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java index 62082624ec..21b25bf92d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java @@ -30,6 +30,7 @@ import org.apache.phoenix.mapreduce.PhoenixOutputFormat; import org.apache.phoenix.mapreduce.PhoenixTestingInputFormat; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PhoenixArray; import org.apache.phoenix.util.PhoenixRuntime; @@ -107,6 +108,34 @@ public class MapReduceIT extends ParallelStatsDisabledIT { } } + @Test + public void testMapReduceWithVerySmallPhoenixQueryTimeout() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + createPagedJobAndTestFailedJobDueToTimeOut(conn, RECORDING_YEAR + " % 2 = 0", 82.89, null, true); + } + } + + @Test + public void testMapReduceWithVerySmallPhoenixQueryTimeoutWithTenantId() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + createPagedJobAndTestFailedJobDueToTimeOut(conn, RECORDING_YEAR + " % 2 = 0", 82.89, TENANT_ID, true); + } + } + + @Test + public void testMapReduceWithNormalPhoenixQueryTimeout() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + createPagedJobAndTestFailedJobDueToTimeOut(conn, RECORDING_YEAR + " % 2 = 0", 82.89, null, false); + } + } + + @Test + public void testMapReduceWithNormalPhoenixQueryTimeoutWithTenantId() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + createPagedJobAndTestFailedJobDueToTimeOut(conn, RECORDING_YEAR + " % 2 = 0", 81.04, TENANT_ID, false); + } + } + @Test public void testWithTenantId() throws Exception { try (Connection conn = DriverManager.getConnection(getUrl())){ @@ -116,7 +145,7 @@ public class MapReduceIT extends ParallelStatsDisabledIT { } - private void createAndTestJob(Connection conn, String s, double v, String tenantId) throws + private void createAndTestJob(Connection conn, String whereCondition, double maxExpected, String tenantId) throws SQLException, IOException, InterruptedException, ClassNotFoundException { String stockTableName = generateUniqueName(); String stockStatsTableName = generateUniqueName(); @@ -126,12 +155,79 @@ public class MapReduceIT extends ParallelStatsDisabledIT { final Configuration conf = getUtility().getConfiguration(); Job job = Job.getInstance(conf); if (tenantId != null) { - setInputForTenant(job, tenantId, stockTableName, s); + setInputForTenant(job, tenantId, stockTableName, whereCondition); + } else { + PhoenixMapReduceUtil.setInput(job, StockWritable.class, PhoenixTestingInputFormat.class, + stockTableName, whereCondition, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER); + } + testJob(conn, job, stockTableName, stockStatsTableName, maxExpected); + + } + + private void createPagedJobAndTestFailedJobDueToTimeOut(Connection conn, String whereCondition, double maxExpected, String tenantId, + boolean testVerySmallTimeOut) throws SQLException, IOException, InterruptedException, ClassNotFoundException { + String stockTableName = generateUniqueName(); + String stockStatsTableName = generateUniqueName(); + conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName)); + conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, stockStatsTableName)); + conn.commit(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + if (testVerySmallTimeOut) { + //Setting Paging Size to 0 and Query Timeout to 1ms so that query get paged quickly and times out immediately, + //Need to set this at conf level as queryPlan generated at PhoenixInputFormat creates a new connection from + //JobContext's configuration. + conf.set(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(0)); + conf.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, Integer.toString(1)); + } else { + conf.set(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(0)); + conf.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, Integer.toString(600000)); + } + + Job job = Job.getInstance(conf); + if (tenantId != null) { + setInputForTenant(job, tenantId, stockTableName, whereCondition); } else { PhoenixMapReduceUtil.setInput(job, StockWritable.class, PhoenixTestingInputFormat.class, - stockTableName, s, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER); + stockTableName, whereCondition, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER); + } + + assertEquals("Failed to reset getRegionBoundaries counter for scanGrouper", 0, + TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries()); + upsertData(conn, stockTableName); + + // only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints + job.getConfiguration().set("mapreduce.framework.name", "local"); + setOutput(job, stockStatsTableName); + + job.setMapperClass(StockMapper.class); + job.setReducerClass(StockReducer.class); + job.setOutputFormatClass(PhoenixOutputFormat.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(DoubleWritable.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(StockWritable.class); + + if (testVerySmallTimeOut) { + // run job and it should fail due to Timeout + assertFalse("Job should fail with QueryTimeout.", job.waitForCompletion(true)); + } else { + //run + assertTrue("Job didn't complete successfully! Check logs for reason.", job.waitForCompletion(true)); + + //verify + ResultSet stats = DriverManager.getConnection(getUrl()).createStatement() + .executeQuery("SELECT * FROM " + stockStatsTableName); + assertTrue("No data stored in stats table!", stats.next()); + String name = stats.getString(1); + double max = stats.getDouble(2); + assertEquals("Got the wrong stock name!", "AAPL", name); + assertEquals("Max value didn't match the expected!", maxExpected, max, 0); + assertFalse("Should only have stored one row in stats table!", stats.next()); + assertEquals("There should have been only be 1 call to getRegionBoundaries " + + "(corresponding to the driver code)", 1, + TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries()); } - testJob(conn, job, stockTableName, stockStatsTableName, v); } @@ -199,8 +295,10 @@ public class MapReduceIT extends ParallelStatsDisabledIT { private void upsertData(Connection conn, String stockTableName) throws SQLException { PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, stockTableName)); + upsertData(stmt, "AAPL", 2010, new Double[]{73.48, 82.25, 75.2, 82.89}); upsertData(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3}); upsertData(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3}); + upsertData(stmt, "AAPL", 2007, new Double[]{73.88, 80.24, 78.9, 66.3}); conn.commit(); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java index 23bfebd449..74cb7f472e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java @@ -45,16 +45,17 @@ public class DelayedTableResultIteratorFactory implements TableResultIteratorFac @Override public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, - QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { + QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches, + long maxQueryEndTime) throws SQLException { return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetricsHolder, - renewLeaseThreshold, plan, scanGrouper, caches); + renewLeaseThreshold, plan, scanGrouper, caches, maxQueryEndTime); } private class DelayedTableResultIterator extends TableResultIterator { public DelayedTableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan, - ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { - super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches); + ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches, long maxQueryEndTime) throws SQLException { + super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches, maxQueryEndTime); } @Override diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java index 45adcfbc68..46a62d9c53 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.iterate; +import static org.apache.phoenix.exception.SQLExceptionCode.OPERATION_TIMED_OUT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -29,8 +30,10 @@ import java.util.Properties; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.end2end.ParallelStatsDisabledTest; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -49,14 +52,13 @@ public class PhoenixQueryTimeoutIT extends ParallelStatsDisabledIT { tableName = generateUniqueName(); int numRows = 1000; String ddl = - "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; + "CREATE TABLE " + tableName + " (K INTEGER NOT NULL PRIMARY KEY, V VARCHAR)"; try (Connection conn = DriverManager.getConnection(getUrl())) { conn.createStatement().execute(ddl); String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)"; PreparedStatement stmt = conn.prepareStatement(dml); for (int i = 1; i <= numRows; i++) { - String key = "key" + i; - stmt.setString(1, key); + stmt.setInt(1, i); stmt.setString(2, "value" + i); stmt.executeUpdate(); } @@ -99,10 +101,51 @@ public class PhoenixQueryTimeoutIT extends ParallelStatsDisabledIT { } assertEquals("Unexpected number of records returned", 1000, count); } catch (Exception e) { - fail("Expected query to suceed"); + fail("Expected query to succeed"); } } + @Test + public void testScanningResultIteratorQueryTimeoutForPagingWithVeryLowTimeout() throws Exception { + //Arrange + PreparedStatement ps = loadDataAndPreparePagedQuery(1,1); + + //Act + Assert + try { + //Do not let BaseResultIterators throw Timeout Exception Let ScanningResultIterator handle it. + BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(true); + ResultSet rs = ps.executeQuery(); + while(rs.next()) {} + fail("Expected query to timeout with a 1 ms timeout"); + } catch (SQLException e) { + //OPERATION_TIMED_OUT Exception expected + assertEquals(OPERATION_TIMED_OUT.getErrorCode(), e.getErrorCode()); + } finally { + BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(false); + } + } + + @Test + public void testScanningResultIteratorQueryTimeoutForPagingWithNormalLowTimeout() throws Exception { + //Arrange + PreparedStatement ps = loadDataAndPreparePagedQuery(30000,30); + + //Act + Assert + try { + //Do not let BaseResultIterators throw Timeout Exception Let ScanningResultIterator handle it. + BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(true); + ResultSet rs = ps.executeQuery(); + int count = 0; + while(rs.next()) { + count++; + } + assertEquals("Unexpected number of records returned", 500, count); + } catch (SQLException e) { + fail("Expected query to succeed"); + } finally { + BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(false); + } + } //----------------------------------------------------------------- // Private Helper Methods @@ -118,4 +161,18 @@ public class PhoenixQueryTimeoutIT extends ParallelStatsDisabledIT { assertEquals(timeoutSecs, phoenixStmt.getQueryTimeout()); return ps; } + + private PreparedStatement loadDataAndPreparePagedQuery(int timeoutMs, int timeoutSecs) throws Exception { + Properties props = new Properties(); + //Setting Paging Size to 0 and Query Timeout to 1ms so that query get paged quickly and times out immediately + props.setProperty(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, String.valueOf(timeoutMs)); + props.setProperty(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(0)); + PhoenixConnection conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class); + PreparedStatement ps = conn.prepareStatement("SELECT * FROM " + tableName + " WHERE K % 2 = 0"); + PhoenixStatement phoenixStmt = ps.unwrap(PhoenixStatement.class); + assertEquals(timeoutMs, phoenixStmt.getQueryTimeoutInMillis()); + assertEquals(timeoutSecs, phoenixStmt.getQueryTimeout()); + assertEquals(0, conn.getQueryServices().getProps().getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1)); + return ps; + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 45ec1f09cd..71263c0a6d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -118,6 +118,7 @@ import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; import org.apache.phoenix.schema.stats.StatisticsUtil; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.EncodedColumnsUtil; @@ -169,6 +170,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private final boolean useStatsForParallelization; protected Map<ImmutableBytesPtr,ServerCache> caches; private final QueryPlan dataPlan; + private static boolean forTestingSetTimeoutToMaxToLetQueryPassHere = false; static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() { @Override @@ -1348,7 +1350,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result final HashCacheClient hashCacheClient = new HashCacheClient(context.getConnection()); int queryTimeOut = context.getStatement().getQueryTimeoutInMillis(); try { - submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper); + submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper, maxQueryEndTime); boolean clearedCache = false; for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) { List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size()); @@ -1357,6 +1359,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result Pair<Scan,Future<PeekingResultIterator>> scanPair = scanPairItr.next(); try { long timeOutForScan = maxQueryEndTime - EnvironmentEdgeManager.currentTimeMillis(); + if (forTestingSetTimeoutToMaxToLetQueryPassHere) { + timeOutForScan = Long.MAX_VALUE; + } if (timeOutForScan < 0) { throw new SQLExceptionInfo.Builder(OPERATION_TIMED_OUT).setMessage( ". Query couldn't be completed in the allotted time: " @@ -1598,7 +1603,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result abstract protected String getName(); abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException; + Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, ParallelScanGrouper scanGrouper, + long maxQueryEndTime) throws SQLException; @Override public int size() { @@ -1710,4 +1716,13 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return this.estimateInfoTimestamp; } + /** + * Used for specific test case to check if timeouts are working in ScanningResultIterator. + * @param setTimeoutToMax + */ + @VisibleForTesting + public static void setForTestingSetTimeoutToMaxToLetQueryPassHere(boolean setTimeoutToMax) { + forTestingSetTimeoutToMaxToLetQueryPassHere = setTimeoutToMax; + } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index f584e12f6b..b4940c2cfa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -163,10 +163,11 @@ public class ChunkedResultIterator implements PeekingResultIterator { ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan, context.getConnection().getLogLevel()); long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds(); + //Chunking is deprecated, putting max value for timeout here. ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(new TableResultIterator(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, - DefaultParallelScanGrouper.getInstance()), chunkSize); + DefaultParallelScanGrouper.getInstance(), Long.MAX_VALUE), chunkSize); resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan); } return resultIterator; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java index 44c714f61d..1008b1b939 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java @@ -33,9 +33,10 @@ public class DefaultTableResultIteratorFactory implements TableResultIteratorFac @Override public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, - QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { + QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches, + long maxQueryEndTime) throws SQLException { return new TableResultIterator(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, - plan, scanGrouper, caches); + plan, scanGrouper, caches, maxQueryEndTime); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 828de398f4..5b0b04b05f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -80,7 +80,8 @@ public class ParallelIterators extends BaseResultIterators { @Override protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, final boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException { + final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, final boolean isReverse, ParallelScanGrouper scanGrouper, + long maxQueryEndTime) throws SQLException { // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor @@ -116,7 +117,7 @@ public class ParallelIterators extends BaseResultIterators { final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator( mutationState, tableRef, scan, scanMetricsHolder, renewLeaseThreshold, plan, - scanGrouper, caches); + scanGrouper, caches, maxQueryEndTime); context.getConnection().addIteratorForLeaseRenewal(tableResultItr); Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java index 51d7bf5dc5..8189fbcef9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java @@ -28,6 +28,7 @@ import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRI import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; +import static org.apache.phoenix.exception.SQLExceptionCode.OPERATION_TIMED_OUT; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_IN_REMOTE_RESULTS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS; @@ -62,6 +63,7 @@ import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ServerUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,14 +79,16 @@ public class ScanningResultIterator implements ResultIterator { private static boolean throwExceptionIfScannerClosedForceFully = false; private final boolean isMapReduceContext; + private final long maxQueryEndTime; - public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext) { + public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext, long maxQueryEndTime) { this.scanner = scanner; this.scanMetricsHolder = scanMetricsHolder; this.context = context; scanMetricsUpdated = false; scanMetricsEnabled = scan.isScanMetricsEnabled(); this.isMapReduceContext = isMapReduceContext; + this.maxQueryEndTime = maxQueryEndTime; } @Override @@ -171,6 +175,12 @@ public class ScanningResultIterator implements ResultIterator { try { Result result = scanner.next(); while (result != null && (result.isEmpty() || isDummy(result))) { + long timeOutForScan = maxQueryEndTime - EnvironmentEdgeManager.currentTimeMillis(); + if (timeOutForScan < 0) { + throw new SQLExceptionInfo.Builder(OPERATION_TIMED_OUT).setMessage( + ". Query couldn't be completed in the allotted time : " + + context.getStatement().getQueryTimeoutInMillis() + " ms").build().buildException(); + } if (!isMapReduceContext && (context.getConnection().isClosing() || context.getConnection().isClosed())) { LOG.warn("Closing ResultScanner as Connection is already closed or in middle of closing"); if (throwExceptionIfScannerClosedForceFully) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index 8674f3f2a6..ff394a3687 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -81,7 +81,7 @@ public class SerialIterators extends BaseResultIterators { @Override protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, final ParallelScanGrouper scanGrouper) { + final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, final ParallelScanGrouper scanGrouper, long maxQueryEndTime) { ExecutorService executor = context.getConnection().getQueryServices().getExecutor(); final String tableName = tableRef.getTable().getPhysicalName().getString(); final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName); @@ -100,7 +100,7 @@ public class SerialIterators extends BaseResultIterators { Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { @Override public PeekingResultIterator call() throws Exception { - PeekingResultIterator itr = new SerialIterator(finalScans, tableName, renewLeaseThreshold, offset, caches); + PeekingResultIterator itr = new SerialIterator(finalScans, tableName, renewLeaseThreshold, offset, caches, maxQueryEndTime); return itr; } @@ -143,14 +143,16 @@ public class SerialIterators extends BaseResultIterators { private PeekingResultIterator currentIterator; private Integer remainingOffset; private Map<ImmutableBytesPtr,ServerCache> caches; + private final long maxQueryEndTime; - private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold, Integer offset, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { + private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold, Integer offset, Map<ImmutableBytesPtr,ServerCache> caches, long maxQueryEndTime) throws SQLException { this.scans = Lists.newArrayListWithExpectedSize(flattenedScans.size()); this.tableName = tableName; this.renewLeaseThreshold = renewLeaseThreshold; this.scans.addAll(flattenedScans); this.remainingOffset = offset; this.caches = caches; + this.maxQueryEndTime = maxQueryEndTime; if (this.remainingOffset != null) { // mark the last scan for offset purposes this.scans.get(this.scans.size() - 1).setAttribute(QueryConstants.LAST_SCAN, Bytes.toBytes(Boolean.TRUE)); @@ -183,7 +185,7 @@ public class SerialIterators extends BaseResultIterators { context.getConnection().getLogLevel()); TableResultIterator itr = new TableResultIterator(mutationState, currentScan, scanMetricsHolder, - renewLeaseThreshold, plan, scanGrouper, caches); + renewLeaseThreshold, plan, scanGrouper, caches, maxQueryEndTime); PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName, plan); Tuple tuple; if ((tuple = peekingItr.peek()) == null) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 5e7e2cfe8f..9ef7eb8200 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -100,6 +100,7 @@ public class TableResultIterator implements ResultIterator { private HashCacheClient hashCacheClient; private final boolean isMapReduceContext; + private final long maxQueryEndTime; @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE! TableResultIterator() { @@ -112,6 +113,7 @@ public class TableResultIterator implements ResultIterator { this.caches = null; this.retry = 0; this.isMapReduceContext = false; + this.maxQueryEndTime = Long.MAX_VALUE; } public static enum RenewLeaseStatus { @@ -119,23 +121,23 @@ public class TableResultIterator implements ResultIterator { }; public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder, - long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { - this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, false); + long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, long maxQueryEndTime) throws SQLException { + this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, false, maxQueryEndTime); } public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder, - long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { - this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches, false); + long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches, long maxQueryEndTime) throws SQLException { + this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches, false, maxQueryEndTime); } public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder, - long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, boolean isMapReduceContext) throws SQLException { - this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, isMapReduceContext); + long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, boolean isMapReduceContext, long maxQueryEndTime) throws SQLException { + this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, isMapReduceContext, maxQueryEndTime); } public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches, - boolean isMapReduceContext) throws SQLException { + boolean isMapReduceContext, long maxQueryEndTime) throws SQLException { this.scan = scan; this.scanMetricsHolder = scanMetricsHolder; this.plan = plan; @@ -149,6 +151,7 @@ public class TableResultIterator implements ResultIterator { this.retry=plan.getContext().getConnection().getQueryServices().getProps() .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES); this.isMapReduceContext = isMapReduceContext; + this.maxQueryEndTime = maxQueryEndTime; ScanUtil.setScanAttributesForClient(scan, table, plan.getContext().getConnection()); } @@ -246,7 +249,7 @@ public class TableResultIterator implements ResultIterator { if (delegate == UNINITIALIZED_SCANNER) { try { this.scanIterator = - new ScanningResultIterator(htable.getScanner(scan), scan, scanMetricsHolder, plan.getContext(), isMapReduceContext); + new ScanningResultIterator(htable.getScanner(scan), scan, scanMetricsHolder, plan.getContext(), isMapReduceContext, maxQueryEndTime); } catch (IOException e) { Closeables.closeQuietly(htable); throw ServerUtil.parseServerException(e); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java index 0b28d5a2f2..fb573bfb23 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java @@ -31,6 +31,7 @@ import org.apache.phoenix.schema.TableRef; public interface TableResultIteratorFactory { public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, - QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException; + QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches, + long maxQueryEndTime) throws SQLException; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java index da3f6becc1..b047dbccb0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java @@ -77,8 +77,9 @@ public class TableSnapshotResultIterator implements ResultIterator { private StatementContext context; private final boolean isMapReduceContext; + private final long maxQueryEndTime; - public TableSnapshotResultIterator(Configuration configuration, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext) + public TableSnapshotResultIterator(Configuration configuration, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext, long maxQueryEndTime) throws IOException { this.configuration = configuration; this.currentRegion = -1; @@ -97,6 +98,7 @@ public class TableSnapshotResultIterator implements ResultIterator { this.rootDir = CommonFSUtils.getRootDir(configuration); this.fs = rootDir.getFileSystem(configuration); this.isMapReduceContext = isMapReduceContext; + this.maxQueryEndTime = maxQueryEndTime; init(); } @@ -158,7 +160,7 @@ public class TableSnapshotResultIterator implements ResultIterator { RegionInfo hri = regions.get(this.currentRegion); this.scanIterator = new ScanningResultIterator(new SnapshotScanner(configuration, fs, restoreDir, htd, hri, scan), - scan, scanMetricsHolder, context, isMapReduceContext); + scan, scanMetricsHolder, context, isMapReduceContext, maxQueryEndTime); } catch (Throwable e) { throw ServerUtil.parseServerException(e); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java index 970eb6f074..014e47b4e6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@ -48,6 +48,7 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.monitoring.ReadMetricQueue; import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,6 +132,9 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null // For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true)); + //Get QueryTimeout From Statement + final long startTime = EnvironmentEdgeManager.currentTimeMillis(); + final long maxQueryEndTime = startTime + queryPlan.getContext().getStatement().getQueryTimeoutInMillis(); PeekingResultIterator peekingResultIterator; ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan, @@ -138,7 +142,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null if (snapshotName != null) { // result iterator to read snapshots final TableSnapshotResultIterator tableSnapshotResultIterator = new TableSnapshotResultIterator(configuration, scan, - scanMetricsHolder, queryPlan.getContext(), true); + scanMetricsHolder, queryPlan.getContext(), true, maxQueryEndTime); peekingResultIterator = LookAheadResultIterator.wrap(tableSnapshotResultIterator); LOGGER.info("Adding TableSnapshotResultIterator for scan: " + scan); } else { @@ -146,7 +150,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null new TableResultIterator( queryPlan.getContext().getConnection().getMutationState(), scan, scanMetricsHolder, renewScannerLeaseThreshold, queryPlan, - this.scanGrouper, true); + this.scanGrouper, true, maxQueryEndTime); peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator); LOGGER.info("Adding TableResultIterator for scan: " + scan); }