Repository: phoenix Updated Branches: refs/heads/master 54a8f2730 -> 97fe4f8aa
PHOENIX-4287 Incorrect aggregate query results when stats are disable for parallelization Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/97fe4f8a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/97fe4f8a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/97fe4f8a Branch: refs/heads/master Commit: 97fe4f8aa24d2b0cdf9d1418252b4d69cfb6e7a1 Parents: 54a8f27 Author: Samarth Jain <[email protected]> Authored: Tue Oct 31 10:12:22 2017 -0700 Committer: Samarth Jain <[email protected]> Committed: Tue Oct 31 10:12:32 2017 -0700 ---------------------------------------------------------------------- .../end2end/ExplainPlanWithStatsEnabledIT.java | 209 ++++++++++++++++++- .../phoenix/iterate/BaseResultIterators.java | 55 +++-- 2 files changed, 246 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/97fe4f8a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java index 62538af..931c398 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_USE_STATS_FOR_PARALLELIZATION; import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.sql.Connection; @@ -387,11 +388,8 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT { @Test public void testBytesRowsForSelectOnTenantViews() throws Exception { String tenant1View = generateUniqueName(); - ; String tenant2View = generateUniqueName(); - ; String tenant3View = generateUniqueName(); - ; String multiTenantBaseTable = generateUniqueName(); String tenant1 = "tenant1"; String tenant2 = "tenant2"; @@ -504,6 +502,211 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT { } } + @Test // See https://issues.apache.org/jira/browse/PHOENIX-4287 + public void testEstimatesForAggregateQueries() throws Exception { + String tableName = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + int guidePostWidth = 20; + String ddl = + "CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, a bigint, b bigint)" + + " GUIDE_POSTS_WIDTH=" + guidePostWidth + + ", USE_STATS_FOR_PARALLELIZATION=false"; + byte[][] splits = + new byte[][] { Bytes.toBytes(102), Bytes.toBytes(105), Bytes.toBytes(108) }; + BaseTest.createTestTable(getUrl(), ddl, splits, null); + conn.createStatement().execute("upsert into " + tableName + " values (100,1,3)"); + conn.createStatement().execute("upsert into " + tableName + " values (101,2,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (102,2,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (103,2,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (104,2,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (105,2,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (106,2,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (107,2,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (108,2,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (109,2,4)"); + conn.commit(); + conn.createStatement().execute("UPDATE STATISTICS " + tableName + ""); + } + List<Object> binds = Lists.newArrayList(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + String sql = "SELECT COUNT(*) " + " FROM " + tableName; + ResultSet rs = conn.createStatement().executeQuery(sql); + assertTrue(rs.next()); + assertEquals(10, rs.getInt(1)); + Estimate info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 10l, info.getEstimatedRows()); + assertTrue(info.getEstimateInfoTs() > 0); + + // Now let's make sure that when using stats for parallelization, our estimates + // and query results stay the same + conn.createStatement().execute( + "ALTER TABLE " + tableName + " SET USE_STATS_FOR_PARALLELIZATION=true"); + rs = conn.createStatement().executeQuery(sql); + assertTrue(rs.next()); + assertEquals(10, rs.getInt(1)); + info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 10l, info.getEstimatedRows()); + assertTrue(info.getEstimateInfoTs() > 0); + } + } + + @Test + public void testSelectQueriesWithStatsForParallelizationOff() throws Exception { + testSelectQueriesWithFilters(false); + } + + @Test + public void testSelectQueriesWithStatsForParallelizationOn() throws Exception { + testSelectQueriesWithFilters(true); + } + + private void testSelectQueriesWithFilters(boolean useStatsForParallelization) throws Exception { + String tableName = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + int guidePostWidth = 20; + String ddl = + "CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, a bigint, b bigint)" + + " GUIDE_POSTS_WIDTH=" + guidePostWidth + + ", USE_STATS_FOR_PARALLELIZATION=" + useStatsForParallelization; + byte[][] splits = + new byte[][] { Bytes.toBytes(102), Bytes.toBytes(105), Bytes.toBytes(108) }; + BaseTest.createTestTable(getUrl(), ddl, splits, null); + conn.createStatement().execute("upsert into " + tableName + " values (100,100,3)"); + conn.createStatement().execute("upsert into " + tableName + " values (101,101,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (102,102,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (103,103,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (104,104,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (105,105,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (106,106,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (107,107,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (108,108,4)"); + conn.createStatement().execute("upsert into " + tableName + " values (109,109,4)"); + conn.commit(); + conn.createStatement().execute("UPDATE STATISTICS " + tableName + ""); + } + List<Object> binds = Lists.newArrayList(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + // query whose start key is before any data + String sql = "SELECT a FROM " + tableName + " WHERE K >= 99"; + ResultSet rs = conn.createStatement().executeQuery(sql); + int i = 0; + int numRows = 10; + while (rs.next()) { + assertEquals(100 + i, rs.getInt(1)); + i++; + } + assertEquals(numRows, i); + Estimate info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 10l, info.getEstimatedRows()); + assertEquals((Long) 930l, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + + // query whose start key is after any data + sql = "SELECT a FROM " + tableName + " WHERE K >= 110"; + rs = conn.createStatement().executeQuery(sql); + assertFalse(rs.next()); + info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 0l, info.getEstimatedRows()); + assertEquals((Long) 0l, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + + // Query whose end key is before any data + sql = "SELECT a FROM " + tableName + " WHERE K <= 98"; + rs = conn.createStatement().executeQuery(sql); + assertFalse(rs.next()); + info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 0l, info.getEstimatedRows()); + assertEquals((Long) 0l, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + + // Query whose end key is after any data. In this case, we return the estimate as + // scanning all the guide posts. + sql = "SELECT a FROM " + tableName + " WHERE K <= 110"; + rs = conn.createStatement().executeQuery(sql); + i = 0; + numRows = 10; + while (rs.next()) { + assertEquals(100 + i, rs.getInt(1)); + i++; + } + assertEquals(numRows, i); + info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 10l, info.getEstimatedRows()); + assertEquals((Long) 930l, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + + // Query whose start key and end key is before any data. In this case, we return the + // estimate as + // scanning the first guide post + sql = "SELECT a FROM " + tableName + " WHERE K <= 90 AND K >= 80"; + rs = conn.createStatement().executeQuery(sql); + assertFalse(rs.next()); + info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 0l, info.getEstimatedRows()); + assertEquals((Long) 0l, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + + // Query whose start key and end key is after any data. In this case, we return the + // estimate as + // scanning no guide post + sql = "SELECT a FROM " + tableName + " WHERE K <= 130 AND K >= 120"; + rs = conn.createStatement().executeQuery(sql); + assertFalse(rs.next()); + info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 0l, info.getEstimatedRows()); + assertEquals((Long) 0l, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + + // Query whose start key is before and end key is between data. In this case, we return + // the estimate as + // scanning no guide post + sql = "SELECT a FROM " + tableName + " WHERE K <= 102 AND K >= 90"; + rs = conn.createStatement().executeQuery(sql); + i = 0; + numRows = 3; + while (rs.next()) { + assertEquals(100 + i, rs.getInt(1)); + i++; + } + info = getByteRowEstimates(conn, sql, binds); + // Depending on the guidepost boundary, this estimate + // can be slightly off. It's called estimate for a reason. + assertEquals((Long) 4l, info.getEstimatedRows()); + assertEquals((Long) 330l, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + // Query whose start key is between and end key is after data. + sql = "SELECT a FROM " + tableName + " WHERE K <= 120 AND K >= 100"; + rs = conn.createStatement().executeQuery(sql); + i = 0; + numRows = 10; + while (rs.next()) { + assertEquals(100 + i, rs.getInt(1)); + i++; + } + info = getByteRowEstimates(conn, sql, binds); + // Depending on the guidepost boundary, this estimate + // can be slightly off. It's called estimate for a reason. + assertEquals((Long) 9l, info.getEstimatedRows()); + assertEquals((Long) 900l, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + // Query whose start key and end key are both between data. + sql = "SELECT a FROM " + tableName + " WHERE K <= 109 AND K >= 100"; + rs = conn.createStatement().executeQuery(sql); + i = 0; + numRows = 10; + while (rs.next()) { + assertEquals(100 + i, rs.getInt(1)); + i++; + } + info = getByteRowEstimates(conn, sql, binds); + // Depending on the guidepost boundary, this estimate + // can be slightly off. It's called estimate for a reason. + assertEquals((Long) 9l, info.getEstimatedRows()); + assertEquals((Long) 900l, info.getEstimatedBytes()); + assertTrue(info.getEstimateInfoTs() > 0); + } + } + private static void createMultitenantTableAndViews(String tenant1View, String tenant2View, String tenant3View, String tenant1, String tenant2, String tenant3, String multiTenantTable, MyClock clock) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/97fe4f8a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 250cb48..e9deec3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -35,6 +35,7 @@ import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.EOFException; +import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; import java.util.BitSet; @@ -585,15 +586,29 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return context.getConnection().getQueryServices().getTableStats(key); } - private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary, HRegionLocation regionLocation) { + private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, + byte[] startKey, boolean crossedRegionBoundary, HRegionLocation regionLocation, + GuidePostEstimate estimate, Long gpsRows, Long gpsBytes) { boolean startNewScan = scanGrouper.shouldStartNewScan(plan, scans, startKey, crossedRegionBoundary); if (scan != null) { if (regionLocation.getServerName() != null) { scan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER, regionLocation.getServerName().getVersionedBytes()); } - scans.add(scan); + if (useStatsForParallelization || crossedRegionBoundary) { + scans.add(scan); + } + if (estimate != null && gpsRows != null) { + estimate.rowsEstimate += gpsRows; + } + if (estimate != null && gpsBytes != null) { + estimate.bytesEstimate += gpsBytes; + } } - if (startNewScan && !scans.isEmpty()) { + if (startNewScan && !scans.isEmpty() && useStatsForParallelization) { + /* + * Note that even if region boundary was crossed, if we are not using stats for + * parallelization, nothing gets added to the parallel scans. + */ parallelScans.add(scans); scans = Lists.newArrayListWithExpectedSize(1); } @@ -653,7 +668,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result newScan.setStopRow(regionInfo.getEndKey()); } } - scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation); + scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation, null, null, null); regionIndex++; } if (!scans.isEmpty()) { // Add any remaining scans @@ -662,6 +677,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return parallelScans; } + private static class GuidePostEstimate { + private long bytesEstimate; + private long rowsEstimate; + } + /** * Compute the list of parallel scans to run for a given query. The inner scans * may be concatenated together directly, while the other ones may need to be @@ -721,8 +741,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result DataInput input = null; PrefixByteDecoder decoder = null; int guideIndex = 0; - long estimatedRows = 0; - long estimatedSize = 0; + GuidePostEstimate estimates = new GuidePostEstimate(); long estimateTs = Long.MAX_VALUE; long minGuidePostTimestamp = Long.MAX_VALUE; try { @@ -763,6 +782,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result endRegionKey = regionInfo.getEndKey(); keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey); } + byte[] initialKeyBytes = currentKeyBytes; while (intersectWithGuidePosts && (endKey.length == 0 || currentGuidePost.compareTo(endKey) <= 0)) { Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset, false); @@ -770,12 +790,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(), regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow()); - estimatedRows += gps.getRowCounts()[guideIndex]; - estimatedSize += gps.getByteCounts()[guideIndex]; - } - if (useStatsForParallelization) { - scans = addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false, regionLocation); } + scans = + addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false, + regionLocation, estimates, gps.getRowCounts()[guideIndex], + gps.getByteCounts()[guideIndex]); currentKeyBytes = currentGuidePostBytes; try { currentGuidePost = PrefixByteCodec.decode(decoder, input); @@ -794,12 +813,19 @@ public abstract class BaseResultIterators extends ExplainTable implements Result intersectWithGuidePosts = false; } } + if (!useStatsForParallelization) { + /* + * If we are not using stats for generating parallel scans, we need to reset the + * currentKey back to what it was at the beginning of the loop. + */ + currentKeyBytes = initialKeyBytes; + } Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset, true); if(newScan != null) { ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(), regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow()); } - scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation); + scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation, null, null, null); currentKeyBytes = endKey; regionIndex++; } @@ -814,8 +840,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result this.estimatedSize = gps.getByteCounts()[0]; this.estimateInfoTimestamp = gps.getGuidePostTimestamps()[0]; } else if (hasGuidePosts) { - this.estimatedRows = estimatedRows; - this.estimatedSize = estimatedSize; + this.estimatedRows = estimates.rowsEstimate; + this.estimatedSize = estimates.bytesEstimate; this.estimateInfoTimestamp = estimateTs; } else { this.estimatedRows = null; @@ -828,7 +854,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } finally { if (stream != null) Closeables.closeQuietly(stream); } - sampleScans(parallelScans,this.plan.getStatement().getTableSamplingRate()); return parallelScans; }
