This is an automated email from the ASF dual-hosted git repository. yanxinyi pushed a commit to branch 4.16 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.16 by this push: new 9dd24ff PHOENIX-6211 Paged scan filters 9dd24ff is described below commit 9dd24ffd5c4258ee284cbf690b58b018bc3199de Author: Kadir Ozdemir <kozde...@salesforce.com> AuthorDate: Sun Dec 6 18:48:45 2020 -0800 PHOENIX-6211 Paged scan filters --- .../apache/phoenix/end2end/SpillableGroupByIT.java | 2 +- .../java/org/apache/phoenix/end2end/ViewTTLIT.java | 285 +++++++++++---------- .../phoenix/coprocessor/BaseRegionScanner.java | 6 + .../coprocessor/BaseScannerRegionObserver.java | 22 +- .../coprocessor/GlobalIndexRegionScanner.java | 47 +++- .../GroupedAggregateRegionObserver.java | 50 ++-- .../phoenix/coprocessor/HashJoinRegionScanner.java | 29 ++- .../coprocessor/IndexRebuildRegionScanner.java | 4 + .../coprocessor/IndexRepairRegionScanner.java | 4 + .../phoenix/coprocessor/IndexerRegionScanner.java | 6 + .../phoenix/coprocessor/PagedRegionScanner.java | 104 ++++++++ .../coprocessor/PhoenixTTLRegionObserver.java | 62 +++-- .../UngroupedAggregateRegionObserver.java | 28 +- .../UngroupedAggregateRegionScanner.java | 29 +-- .../org/apache/phoenix/filter/DelegateFilter.java | 11 + .../org/apache/phoenix/filter/PagedFilter.java | 267 +++++++++++++++++++ .../apache/phoenix/index/GlobalIndexChecker.java | 67 +++-- .../iterate/NonAggregateRegionScannerFactory.java | 20 +- .../phoenix/iterate/OffsetResultIterator.java | 16 +- .../phoenix/iterate/OrderedResultIterator.java | 52 +++- .../phoenix/iterate/RegionScannerFactory.java | 29 ++- .../iterate/RegionScannerResultIterator.java | 7 + .../phoenix/iterate/ScanningResultIterator.java | 2 +- .../phoenix/iterate/TableResultIterator.java | 14 +- .../org/apache/phoenix/query/QueryServices.java | 5 +- .../apache/phoenix/query/QueryServicesOptions.java | 2 - .../tuple/EncodedColumnQualiferCellsList.java | 7 + .../apache/phoenix/schema/tuple/ResultTuple.java | 1 - .../java/org/apache/phoenix/util/ScanUtil.java | 91 ++++++- .../java/org/apache/phoenix/query/BaseTest.java | 7 +- 30 files changed, 990 insertions(+), 286 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java index 1fd8a65..eb379a6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java @@ -82,7 +82,7 @@ public class SpillableGroupByIT extends BaseOwnClusterIT { props.put(QueryServices.STATS_COLLECTION_ENABLED, Boolean.toString(false)); props.put(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString()); props.put(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.TRUE.toString()); - props.put(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, Long.toString(1000)); + props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(60000)); // Must update config before starting server setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java index c9fb506..1fd6395 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java @@ -54,10 +54,12 @@ import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TableOptions; import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions; import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewOptions; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -760,7 +762,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { GlobalViewIndexOptions globalViewIndexOptions = SchemaBuilder.GlobalViewIndexOptions.withDefaults(); - globalViewIndexOptions.setLocal(false); TenantViewOptions tenantViewOptions = new TenantViewOptions(); tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); @@ -776,65 +777,69 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { testCaseWhenAllCFMatchAndAllDefault .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null)); - // Define the test schema. - final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); - schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions) - .withGlobalViewIndexOptions(globalViewIndexOptions) - .withTenantViewOptions(tenantViewOptions) - .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build(); + for (boolean isGlobalIndexLocal : Lists.newArrayList(true, false)) { + globalViewIndexOptions.setLocal(isGlobalIndexLocal); - // Define the test data. - final List<String> outerCol4s = Lists.newArrayList(); - DataSupplier dataSupplier = new DataSupplier() { - String col4ForWhereClause; + // Define the test schema. + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewOptions) + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build(); - @Override public List<Object> getValues(int rowIndex) { - Random rnd = new Random(); - String id = String.format(ID_FMT, rowIndex); - String zid = String.format(ZID_FMT, rowIndex); - String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); + // Define the test data. + final List<String> outerCol4s = Lists.newArrayList(); + DataSupplier dataSupplier = new DataSupplier() { + String col4ForWhereClause; - // Store the col4 data to be used later in a where clause - outerCol4s.add(col4); - String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); - String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); - String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); - String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); - String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String id = String.format(ID_FMT, rowIndex); + String zid = String.format(ZID_FMT, rowIndex); + String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); - return Lists - .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 }); - } - }; + // Store the col4 data to be used later in a where clause + outerCol4s.add(col4); + String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); + String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); + String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); - // Create a test data reader/writer for the above schema. - DataWriter dataWriter = new BasicDataWriter(); - DataReader dataReader = new BasicDataReader(); + return Lists + .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 }); + } + }; - List<String> columns = - Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9"); - List<String> rowKeyColumns = Lists.newArrayList("COL6"); - String tenantConnectUrl = - getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); - try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) { - writeConnection.setAutoCommit(true); - dataWriter.setConnection(writeConnection); - dataWriter.setDataSupplier(dataSupplier); - dataWriter.setUpsertColumns(columns); - dataWriter.setRowKeyColumns(rowKeyColumns); - dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + DataReader dataReader = new BasicDataReader(); - // Upsert data for validation - upsertData(dataWriter, DEFAULT_NUM_ROWS); + List<String> columns = + Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("COL6"); + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); - dataReader.setValidationColumns(rowKeyColumns); - dataReader.setRowKeyColumns(rowKeyColumns); - dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'", - schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1))); - dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + // Upsert data for validation + upsertData(dataWriter, DEFAULT_NUM_ROWS); - // Validate data before and after ttl expiration. - validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + dataReader.setValidationColumns(rowKeyColumns); + dataReader.setRowKeyColumns(rowKeyColumns); + dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'", + schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1))); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + } } } @@ -858,7 +863,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { GlobalViewIndexOptions globalViewIndexOptions = SchemaBuilder.GlobalViewIndexOptions.withDefaults(); - globalViewIndexOptions.setLocal(false); TenantViewOptions tenantViewOptions = new TenantViewOptions(); tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); @@ -874,101 +878,105 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { testCaseWhenAllCFMatchAndAllDefault .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null)); - // Define the test schema. - final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); - schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions) - .withGlobalViewIndexOptions(globalViewIndexOptions) - .withTenantViewOptions(tenantViewOptions) - .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build(); - - // Define the test data. - final List<String> outerCol4s = Lists.newArrayList(); - DataSupplier dataSupplier = new DataSupplier() { - - @Override public List<Object> getValues(int rowIndex) { - Random rnd = new Random(); - String id = String.format(ID_FMT, rowIndex); - String zid = String.format(ZID_FMT, rowIndex); - String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); - - // Store the col4 data to be used later in a where clause - outerCol4s.add(col4); - String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); - String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); - String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); - String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); - String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); - - return Lists - .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 }); - } - }; - - // Create a test data reader/writer for the above schema. - DataWriter dataWriter = new BasicDataWriter(); - DataReader dataReader = new BasicDataReader(); - - List<String> columns = - Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9"); - List<String> nonCoveredColumns = - Lists.newArrayList("ID", "ZID", "COL5", "COL7", "COL8", "COL9"); - List<String> rowKeyColumns = Lists.newArrayList("COL6"); - String tenantConnectUrl = - getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); - try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) { - writeConnection.setAutoCommit(true); - dataWriter.setConnection(writeConnection); - dataWriter.setDataSupplier(dataSupplier); - dataWriter.setUpsertColumns(columns); - dataWriter.setRowKeyColumns(rowKeyColumns); - dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); - - // Upsert data for validation - upsertData(dataWriter, DEFAULT_NUM_ROWS); - - dataReader.setValidationColumns(rowKeyColumns); - dataReader.setRowKeyColumns(rowKeyColumns); - dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'", - schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1))); - dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); - - // Validate data before and after ttl expiration. - validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + for (boolean isGlobalIndexLocal : Lists.newArrayList(true, false)) { + globalViewIndexOptions.setLocal(isGlobalIndexLocal); - // Now update the above data but not modifying the covered columns. - // Ensure/validate that empty columns for the index are still updated. + // Define the test schema. + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewOptions) + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build(); - // Data supplier where covered and included (col4 and col6) columns are not updated. - DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier() { + // Define the test data. + final List<String> outerCol4s = Lists.newArrayList(); + DataSupplier dataSupplier = new DataSupplier() { @Override public List<Object> getValues(int rowIndex) { Random rnd = new Random(); String id = String.format(ID_FMT, rowIndex); String zid = String.format(ZID_FMT, rowIndex); + String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); + + // Store the col4 data to be used later in a where clause + outerCol4s.add(col4); String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); + String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); - return Lists.newArrayList(new Object[] { id, zid, col5, col7, col8, col9 }); + return Lists + .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 }); } }; - // Upsert data for validation with non covered columns - dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns); - dataWriter.setUpsertColumns(nonCoveredColumns); - upsertData(dataWriter, DEFAULT_NUM_ROWS); + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + DataReader dataReader = new BasicDataReader(); - List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6"); - dataReader.setValidationColumns(rowKeyColumns1); - dataReader.setRowKeyColumns(rowKeyColumns1); - dataReader.setDML(String.format("SELECT id, col6 from %s where col4 = '%s'", - schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1))); + List<String> columns = + Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9"); + List<String> nonCoveredColumns = + Lists.newArrayList("ID", "ZID", "COL5", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("COL6"); + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); - // Validate data before and after ttl expiration. - validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + // Upsert data for validation + upsertData(dataWriter, DEFAULT_NUM_ROWS); + dataReader.setValidationColumns(rowKeyColumns); + dataReader.setRowKeyColumns(rowKeyColumns); + dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'", + schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1))); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + + // Now update the above data but not modifying the covered columns. + // Ensure/validate that empty columns for the index are still updated. + + // Data supplier where covered and included (col4 and col6) columns are not updated. + DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String id = String.format(ID_FMT, rowIndex); + String zid = String.format(ZID_FMT, rowIndex); + String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); + String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists.newArrayList(new Object[] { id, zid, col5, col7, col8, col9 }); + } + }; + + // Upsert data for validation with non covered columns + dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns); + dataWriter.setUpsertColumns(nonCoveredColumns); + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6"); + dataReader.setValidationColumns(rowKeyColumns1); + dataReader.setRowKeyColumns(rowKeyColumns1); + dataReader.setDML(String.format("SELECT id, col6 from %s where col4 = '%s'", + schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1))); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + } } + } @@ -2370,6 +2378,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { Properties props = new Properties(); long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis(); props.setProperty("CurrentSCN", Long.toString(scnTimestamp)); + props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) { dataReader.setConnection(readConnection); @@ -2487,7 +2496,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { final Statement statement = deleteConnection.createStatement()) { deleteConnection.setAutoCommit(true); - final String deleteIfExpiredStatement = String.format("select * from %s", viewName); + final String deleteIfExpiredStatement = String.format("select /*+NO_INDEX*/ count(*) from %s", viewName); Preconditions.checkNotNull(deleteIfExpiredStatement); final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); @@ -2506,6 +2515,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName); scan.setAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES); scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL, Bytes.toBytes(Long.valueOf(table.getPhoenixTTL()))); + PhoenixResultSet rs = pstmt.newResultSet(queryPlan.iterator(), queryPlan.getProjector(), queryPlan.getContext()); while (rs.next()); @@ -2527,7 +2537,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { final Statement statement = deleteConnection.createStatement()) { deleteConnection.setAutoCommit(true); - final String deleteIfExpiredStatement = String.format("select * from %s", indexName); + final String deleteIfExpiredStatement = String.format("select count(*) from %s", indexName); Preconditions.checkNotNull(deleteIfExpiredStatement); final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); @@ -2597,4 +2607,19 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { return testCases; } + + private void runValidations(long phoenixTTL, + com.google.common.collect.Table<String, String, Object> table, + DataReader dataReader, SchemaBuilder schemaBuilder) + throws Exception { + + //Insert for the first time and validate them. + validateExpiredRowsAreNotReturnedUsingData(phoenixTTL, table, + dataReader, schemaBuilder); + + // Update the above rows and validate the same. + validateExpiredRowsAreNotReturnedUsingData(phoenixTTL, table, + dataReader, schemaBuilder); + + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java index 945c1c4..5c54854 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java @@ -22,6 +22,8 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; @@ -58,4 +60,8 @@ public abstract class BaseRegionScanner extends DelegateRegionScanner { public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment"); } + + public RegionScanner getNewRegionScanner(Scan scan) throws IOException { + return ((BaseRegionScanner)delegate).getNewRegionScanner(scan); + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index e028e41..772d1c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.filter.PagedFilter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory; @@ -62,6 +63,8 @@ import org.apache.phoenix.util.TransactionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForFilter; + abstract public class BaseScannerRegionObserver extends BaseRegionObserver { private static final Logger LOG = LoggerFactory.getLogger(BaseScannerRegionObserver.class); @@ -86,10 +89,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging"; // The number of index rows to be rebuild in one RPC call public static final String INDEX_REBUILD_PAGE_ROWS = "_IndexRebuildPageRows"; - public static final String SERVER_PAGING = "_ServerPaging"; - // The number of rows to be scanned in one RPC call - public static final String AGGREGATE_PAGE_SIZE_IN_MS = "_AggregatePageSizeInMs"; - + public static final String SERVER_PAGE_SIZE_MS = "_ServerPageSizeMs"; // Index verification type done by the index tool public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType"; public static final String INDEX_RETRY_VERIFY = "_IndexRetryVerify"; @@ -190,7 +190,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { return this.getClass().getName(); } - private static void throwIfScanOutOfRegion(Scan scan, Region region) throws DoNotRetryIOException { boolean isLocalIndex = ScanUtil.isLocalIndex(scan); byte[] lowerInclusiveScanKey = scan.getStartRow(); @@ -254,6 +253,12 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { // last possible moment. You need to swap the start/stop and make the // start exclusive and the stop inclusive. ScanUtil.setupReverseScan(scan); + if (!(scan.getFilter() instanceof PagedFilter)) { + byte[] pageSizeMsBytes = scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS); + if (pageSizeMsBytes != null) { + scan.setFilter(new PagedFilter(scan.getFilter(), getPageSizeMsForFilter(scan))); + } + } } return s; } @@ -355,11 +360,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public final RegionScanner postScannerOpen( final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException { - try { + try { if (!isRegionObserverFor(scan)) { return s; } - return new RegionScannerHolder(c, scan, s); + // Make sure PageRegionScanner wraps only the lowest region scanner, i.e., HBase region scanner. We assume + // here every Phoenix region scanner extends BaseRegionScanner + return new RegionScannerHolder(c, scan, s instanceof BaseRegionScanner ? s : + new PagedRegionScanner(c.getEnvironment().getRegion(), s, scan)); } catch (Throwable t) { // If the exception is NotServingRegionException then throw it as // StaleRegionBoundaryCacheException to handle it by phoenix client other wise hbase diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java index cc2772d..540851a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; @@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.filter.AllVersionsIndexRebuildFilter; +import org.apache.phoenix.filter.PagedFilter; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; @@ -95,6 +97,7 @@ import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputReposito import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB; +import static org.apache.phoenix.util.ScanUtil.isDummy; /** * This is an abstract region scanner which is used to scan index or data table rows locally. From the data table rows, @@ -1391,6 +1394,35 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { return indexMutations.size(); } + static boolean adjustScanFilter(Scan scan) { + // For rebuilds we use count (*) as query for regular tables which ends up setting the FirstKeyOnlyFilter on scan + // This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col + // For rebuilds we need all columns and all versions + + Filter filter = scan.getFilter(); + if (filter instanceof PagedFilter) { + PagedFilter pageFilter = (PagedFilter) filter; + Filter delegateFilter = pageFilter.getDelegateFilter(); + if (delegateFilter == null || delegateFilter instanceof FirstKeyOnlyFilter) { + scan.setFilter(null); + return true; + } + if (delegateFilter instanceof FirstKeyOnlyFilter) { + pageFilter.setDelegateFilter(null); + } else if (delegateFilter != null) { + // Override the filter so that we get all versions + pageFilter.setDelegateFilter(new AllVersionsIndexRebuildFilter(delegateFilter)); + } + } else if (filter instanceof FirstKeyOnlyFilter) { + scan.setFilter(null); + return true; + } else if (filter != null) { + // Override the filter so that we get all versions + scan.setFilter(new AllVersionsIndexRebuildFilter(filter)); + } + return false; + } + protected RegionScanner getLocalScanner() throws IOException { // override the filter to skip scan and open new scanner // when lower bound of timerange is passed or newStartKey was populated @@ -1405,20 +1437,12 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { for (byte[] family : scan.getFamilyMap().keySet()) { incrScan.addFamily(family); } - // For rebuilds we use count (*) as query for regular tables which ends up setting the FKOF on scan - // This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col - // For rebuilds we need all columns and all versions - if (scan.getFilter() instanceof FirstKeyOnlyFilter) { - incrScan.setFilter(null); - } else if (scan.getFilter() != null) { - // Override the filter so that we get all versions - incrScan.setFilter(new AllVersionsIndexRebuildFilter(scan.getFilter())); - } + adjustScanFilter(incrScan); if(nextStartKey != null) { incrScan.setStartRow(nextStartKey); } List<KeyRange> keys = new ArrayList<>(); - try(RegionScanner scanner = region.getScanner(incrScan)) { + try(RegionScanner scanner = new PagedRegionScanner(region, region.getScanner(incrScan), incrScan)) { List<Cell> row = new ArrayList<>(); int rowCount = 0; // collect row keys that have been modified in the given time-range @@ -1427,6 +1451,9 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { ungroupedAggregateRegionObserver.checkForRegionClosing(); hasMoreIncr = scanner.nextRaw(row); if (!row.isEmpty()) { + if (isDummy(row)) { + continue; + } keys.add(PVarbinary.INSTANCE.getKeyRange(CellUtil.cloneRow(row.get(0)))); rowCount++; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index cf2a094..5b54601 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -22,10 +22,11 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryServices.GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB; -import static org.apache.phoenix.query.QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_SPILLABLE; import static org.apache.phoenix.util.ScanUtil.getDummyResult; +import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner; +import static org.apache.phoenix.util.ScanUtil.isDummy; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -64,7 +65,6 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; @@ -163,7 +163,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { if (j != null) { innerScanner = - new HashJoinRegionScanner(innerScanner, p, j, ScanUtil.getTenantId(scan), + new HashJoinRegionScanner(innerScanner, scan, p, j, ScanUtil.getTenantId(scan), c.getEnvironment(), useQualifierAsIndex, useNewValueColumnQualifier); } @@ -172,22 +172,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { if (limitBytes != null) { limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault()); } - long pageSizeInMs = Long.MAX_VALUE; - if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != null) { - byte[] pageSizeFromScan = - scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_SIZE_IN_MS); - if (pageSizeFromScan != null) { - pageSizeInMs = Bytes.toLong(pageSizeFromScan); - } else { - pageSizeInMs = c.getEnvironment().getConfiguration().getLong(GROUPED_AGGREGATE_PAGE_SIZE_IN_MS, - QueryServicesOptions.DEFAULT_GROUPED_AGGREGATE_PAGE_SIZE_IN_MS); - } - } + long pageSizeMs = getPageSizeMsForRegionScanner(scan); if (keyOrdered) { // Optimize by taking advantage that the rows are // already in the required group by key order - return new OrderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeInMs); + return new OrderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeMs); } else { // Otherwse, collect them all up in an in memory map - return new UnorderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeInMs); + return new UnorderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeMs); } } } @@ -410,14 +400,14 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { private final ServerAggregators aggregators; private final long limit; private final List<Expression> expressions; - private final long pageSizeInMs; + private final long pageSizeMs; private RegionScanner regionScanner = null; private final boolean spillableEnabled; private final GroupByCache groupByCache; private UnorderedGroupByRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner scanner, final List<Expression> expressions, - final ServerAggregators aggregators, final long limit, final long pageSizeInMs) { + final ServerAggregators aggregators, final long limit, final long pageSizeMs) { super(scanner); this.region = c.getEnvironment().getRegion(); minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); @@ -425,7 +415,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); this.aggregators = aggregators; this.limit = limit; - this.pageSizeInMs = pageSizeInMs; + this.pageSizeMs = pageSizeMs; this.expressions = expressions; RegionCoprocessorEnvironment env = c.getEnvironment(); Configuration conf = env.getConfiguration(); @@ -477,6 +467,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { // ones returned hasMore = delegate.nextRaw(results); if (!results.isEmpty()) { + if (isDummy(results)) { + getDummyResult(resultsToReturn); + return true; + } result.setKeyValues(results); ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(result, expressions); @@ -485,8 +479,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { aggregators.aggregate(rowAggregators, result); } now = EnvironmentEdgeManager.currentTimeMillis(); - } while (hasMore && groupByCache.size() < limit && (now - startTime) < pageSizeInMs); - if (hasMore && groupByCache.size() < limit && (now - startTime) >= pageSizeInMs) { + } while (hasMore && groupByCache.size() < limit && (now - startTime) < pageSizeMs); + if (hasMore && groupByCache.size() < limit && (now - startTime) >= pageSizeMs) { // Return a dummy result as we have processed a page worth of rows // but we are not ready to aggregate getDummyResult(resultsToReturn); @@ -529,13 +523,13 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { private final long limit; private Aggregator[] rowAggregators; private final List<Expression> expressions; - private final long pageSizeInMs; + private final long pageSizeMs; private long rowCount = 0; private ImmutableBytesPtr currentKey = null; private OrderedGroupByRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner scanner, final List<Expression> expressions, - final ServerAggregators aggregators, final long limit, final long pageSizeInMs) { + final ServerAggregators aggregators, final long limit, final long pageSizeMs) { super(scanner); this.scan = scan; this.region = c.getEnvironment().getRegion(); @@ -545,7 +539,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { this.aggregators = aggregators; rowAggregators = aggregators.getAggregators(); this.limit = limit; - this.pageSizeInMs = pageSizeInMs; + this.pageSizeMs = pageSizeMs; this.expressions = expressions; } @@ -579,6 +573,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { // ones returned hasMore = delegate.nextRaw(kvs); if (!kvs.isEmpty()) { + if (isDummy(kvs)) { + getDummyResult(results); + return true; + } result.setKeyValues(kvs); key = TupleUtil.getConcatenatedValue(result, expressions); aggBoundary = currentKey != null && currentKey.compareTo(key) != 0; @@ -598,12 +596,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { // Do rowCount + 1 b/c we don't have to wait for a complete // row in the case of a DISTINCT with a LIMIT now = EnvironmentEdgeManager.currentTimeMillis(); - } while (hasMore && !aggBoundary && !atLimit && (now - startTime) < pageSizeInMs); + } while (hasMore && !aggBoundary && !atLimit && (now - startTime) < pageSizeMs); } } finally { if (acquiredLock) region.closeRegionOperation(); } - if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= pageSizeInMs) { + if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= pageSizeMs) { // Return a dummy result as we have processed a page worth of rows // but we are not ready to aggregate getDummyResult(results); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index 6676a1c..108f767 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -26,9 +26,11 @@ import java.util.Queue; import java.util.Set; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; @@ -52,9 +54,14 @@ import org.apache.phoenix.schema.tuple.PositionBasedResultTuple; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TupleUtil; +import static org.apache.phoenix.util.ScanUtil.getDummyResult; +import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner; +import static org.apache.phoenix.util.ScanUtil.isDummy; + public class HashJoinRegionScanner implements RegionScanner { private final RegionScanner scanner; @@ -72,20 +79,21 @@ public class HashJoinRegionScanner implements RegionScanner { private final boolean useQualifierAsListIndex; private final boolean useNewValueColumnQualifier; private final boolean addArrayCell; + private final long pageSizeMs; @SuppressWarnings("unchecked") - public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, + public HashJoinRegionScanner(RegionScanner scanner, Scan scan, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, RegionCoprocessorEnvironment env, boolean useQualifierAsIndex, boolean useNewValueColumnQualifier) throws IOException { - this(env, scanner, null, null, projector, joinInfo, + this(env, scanner, scan, null, null, projector, joinInfo, tenantId, useQualifierAsIndex, useNewValueColumnQualifier); } @SuppressWarnings("unchecked") - public HashJoinRegionScanner(RegionCoprocessorEnvironment env, RegionScanner scanner, + public HashJoinRegionScanner(RegionCoprocessorEnvironment env, RegionScanner scanner, Scan scan, final Set<KeyValueColumnExpression> arrayKVRefs, final Expression[] arrayFuncRefs, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, @@ -137,6 +145,7 @@ public class HashJoinRegionScanner implements RegionScanner { this.useNewValueColumnQualifier = useNewValueColumnQualifier; this.addArrayCell = (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs != null && arrayKVRefs.size() > 0); + this.pageSizeMs = getPageSizeMsForRegionScanner(scan); } private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException { @@ -288,9 +297,23 @@ public class HashJoinRegionScanner implements RegionScanner { @Override public boolean nextRaw(List<Cell> result) throws IOException { try { + long startTime = EnvironmentEdgeManager.currentTimeMillis(); while (shouldAdvance()) { hasMore = scanner.nextRaw(result); + if (isDummy(result)) { + return true; + } + if (result.isEmpty()) { + return hasMore; + } + Cell cell = result.get(0); processResults(result, false); + if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) { + byte[] rowKey = CellUtil.cloneRow(cell); + result.clear(); + getDummyResult(rowKey, result); + return true; + } result.clear(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java index fa1ca40..f51f07f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java @@ -21,6 +21,7 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; +import static org.apache.phoenix.util.ScanUtil.isDummy; import java.io.IOException; import java.util.ArrayList; @@ -313,6 +314,9 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { hasMore = localScanner.nextRaw(row); if (!row.isEmpty()) { lastCell = row.get(0); // lastCell is any cell from the last visited row + if (isDummy(row)) { + break; + } Put put = null; Delete del = null; for (Cell cell : row) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java index 5e69925..2a67852 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java @@ -22,6 +22,7 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; +import static org.apache.phoenix.util.ScanUtil.isDummy; import java.io.IOException; import java.util.ArrayList; @@ -385,6 +386,9 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner { hasMore = localScanner.nextRaw(row); if (!row.isEmpty()) { lastCell = row.get(0); // lastCell is any cell from the last visited row + if (isDummy(row)) { + break; + } indexMutationCount += populateIndexMutationFromIndexRow(row, indexMutationMap); rowCount++; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java index 7f61be3..85a12b1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java @@ -23,13 +23,16 @@ import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_VALUE_BYTES; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; +import static org.apache.phoenix.util.ScanUtil.isDummy; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -365,6 +368,9 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner { hasMore = innerScanner.nextRaw(row); if (!row.isEmpty()) { lastCell = row.get(0); // lastCell is any cell from the last visited row + if (isDummy(row)) { + break; + } Put put = null; Delete del = null; for (Cell cell : row) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java new file mode 100644 index 0000000..471839b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.phoenix.filter.PagedFilter; + +import static org.apache.phoenix.compat.hbase.CompatUtil.setStartRow; +import static org.apache.phoenix.util.ScanUtil.getDummyResult; +import static org.apache.phoenix.util.ScanUtil.getPhoenixPagedFilter; + +/** + * PagedRegionScanner works with PagedFilter to make sure that the time between two rows returned by the HBase region + * scanner should not exceed the configured page size in ms (on PagedFilter). When the page size is reached (because + * there are too many cells/rows to be filtered out), PagedFilter stops the HBase region scanner and sets its state + * to STOPPED. In this case, the HBase region scanner next() returns false and PagedFilter#isStopped() returns true. + * PagedRegionScanner is responsible for detecting PagedFilter has stopped the scanner, and then closing the current + * HBase region scanner, starting a new one to resume the scan operation and returning a dummy result to signal to + * Phoenix client to resume the scan operation by skipping this dummy result and calling ResultScanner#next(). + */ +public class PagedRegionScanner extends BaseRegionScanner { + protected Region region; + protected Scan scan; + protected PagedFilter pageFilter; + public PagedRegionScanner(Region region, RegionScanner scanner, Scan scan) { + super(scanner); + this.region = region; + this.scan = scan; + pageFilter = getPhoenixPagedFilter(scan); + if (pageFilter != null) { + pageFilter.init(); + } + } + + private boolean next(List<Cell> results, boolean raw) throws IOException { + try { + boolean hasMore = raw ? delegate.nextRaw(results) : delegate.next(results); + if (pageFilter == null) { + return hasMore; + } + if (!hasMore) { + // There is no more row from the HBase region scanner. We need to check if PageFilter + // has stopped the region scanner + if (pageFilter.isStopped()) { + // Close the current region scanner, start a new one and return a dummy result + delegate.close(); + byte[] rowKey = pageFilter.getRowKeyAtStop(); + setStartRow(scan, rowKey, true); + delegate = region.getScanner(scan); + if (results.isEmpty()) { + getDummyResult(rowKey, results); + } + pageFilter.init(); + return true; + } + return false; + } else { + // We got a row from the HBase scanner within the configured time (i.e., the page size). We need to + // start a new page on the next next() call. + pageFilter.resetStartTime(); + return true; + } + } catch (Exception e) { + pageFilter.init(); + throw e; + } + } + + @Override + public boolean next(List<Cell> results) throws IOException { + return next(results, false); + } + + @Override + public boolean nextRaw(List<Cell> results) throws IOException { + return next(results, true); + } + + @Override + public RegionScanner getNewRegionScanner(Scan scan) throws IOException { + return new PagedRegionScanner(region, region.getScanner(scan), scan); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java index f6c1b96..d873286 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java @@ -18,6 +18,7 @@ package org.apache.phoenix.coprocessor; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -41,35 +43,35 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.sql.SQLException; import java.util.Iterator; import java.util.List; -import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME; -import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME; +import static org.apache.phoenix.util.ScanUtil.getDummyResult; +import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner; +import static org.apache.phoenix.util.ScanUtil.isDummy; /** * Coprocessor that checks whether the row is expired based on the TTL spec. */ -public class PhoenixTTLRegionObserver extends BaseRegionObserver { +public class PhoenixTTLRegionObserver extends BaseScannerRegionObserver { private static final Logger LOG = LoggerFactory.getLogger(PhoenixTTLRegionObserver.class); private MetricsPhoenixTTLSource metricSource; - @Override public void start(CoprocessorEnvironment e) throws IOException { - super.start(e); + @Override + public void start(CoprocessorEnvironment e) throws IOException { metricSource = MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixTTLSource(); } - @Override public void stop(CoprocessorEnvironment e) throws IOException { - super.stop(e); + @Override + protected boolean isRegionObserverFor(Scan scan) { + return ScanUtil.isMaskTTLExpiredRows(scan) || ScanUtil.isDeleteTTLExpiredRows(scan); } @Override - public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, - RegionScanner s) throws IOException { - - if (!ScanUtil.isMaskTTLExpiredRows(scan) && !ScanUtil.isDeleteTTLExpiredRows(scan)) { - return s; - } else if (ScanUtil.isMaskTTLExpiredRows(scan) && ScanUtil.isDeleteTTLExpiredRows(scan)) { + protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, + final RegionScanner s) throws IOException, SQLException { + if (ScanUtil.isMaskTTLExpiredRows(scan) && ScanUtil.isDeleteTTLExpiredRows(scan)) { throw new IOException("Both mask and delete expired rows property cannot be set"); } else if (ScanUtil.isMaskTTLExpiredRows(scan)) { metricSource.incrementMaskExpiredRequestCount(); @@ -99,10 +101,11 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver { /** * A region scanner that checks the TTL expiration of rows */ - private static class PhoenixTTLRegionScanner implements RegionScanner { + private static class PhoenixTTLRegionScanner extends BaseRegionScanner { private static final String MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR = "MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID"; + private final RegionCoprocessorEnvironment env; private final RegionScanner scanner; private final Scan scan; private final byte[] emptyCF; @@ -119,9 +122,12 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver { private long numRowsScanned; private long numRowsDeleted; private boolean reported = false; + private long pageSizeMs; public PhoenixTTLRegionScanner(RegionCoprocessorEnvironment env, Scan scan, RegionScanner scanner) throws IOException { + super(scanner); + this.env = env; this.scan = scan; this.scanner = scanner; byte[] requestIdBytes = scan.getAttribute(MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR); @@ -145,6 +151,7 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver { now = maxTimestamp != HConstants.LATEST_TIMESTAMP ? maxTimestamp : EnvironmentEdgeManager.currentTimeMillis(); + pageSizeMs = getPageSizeMsForRegionScanner(scan); } @Override public int getBatch() { @@ -186,10 +193,6 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver { return scanner.getRegionInfo(); } - @Override public boolean isFilterDone() throws IOException { - return scanner.isFilterDone(); - } - @Override public boolean reseek(byte[] row) throws IOException { return scanner.reseek(row); } @@ -204,17 +207,30 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver { private boolean doNext(List<Cell> result, boolean raw) throws IOException { try { + long startTime = EnvironmentEdgeManager.currentTimeMillis(); boolean hasMore; do { hasMore = raw ? scanner.nextRaw(result) : scanner.next(result); if (result.isEmpty()) { break; } + if (isDummy(result)) { + return true; + } + + /** + Note : That both MaskIfExpiredRequest and DeleteIfExpiredRequest cannot be set at the same time. + Case : MaskIfExpiredRequest, If row not expired then return. + */ numRowsScanned++; if (maskIfExpired && checkRowNotExpired(result)) { break; } + /** + Case : DeleteIfExpiredRequest, If deleted then return. + So that it will count towards the aggregate deleted count. + */ if (deleteIfExpired && deleteRowIfExpired(result)) { numRowsDeleted++; break; @@ -226,6 +242,12 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver { if (maskIfExpired) { numRowsExpired++; } + if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) { + byte[] rowKey = CellUtil.cloneRow(result.get(0)); + result.clear(); + getDummyResult(rowKey, result); + return true; + } result.clear(); } while (hasMore); return hasMore; @@ -303,5 +325,9 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver { return true; } + @Override + public RegionScanner getNewRegionScanner(Scan scan) throws IOException { + return new PhoenixTTLRegionScanner(env, scan, ((BaseRegionScanner)delegate).getNewRegionScanner(scan)); + } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index ed4ea74..cdec974 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.coprocessor; +import static org.apache.phoenix.coprocessor.GlobalIndexRegionScanner.adjustScanFilter; import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; @@ -440,7 +441,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } if (j != null) { - theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier); + theScanner = new HashJoinRegionScanner(theScanner, scan, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier); } return new UngroupedAggregateRegionScanner(c, theScanner,region, scan, env, this); } @@ -624,7 +625,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } - private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan, + private RegionScanner rebuildIndices(RegionScanner innerScanner, final Region region, final Scan scan, final RegionCoprocessorEnvironment env) throws IOException { boolean oldCoproc = region.getTableDesc().hasCoprocessor(Indexer.class.getCanonicalName()); byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE); @@ -633,29 +634,28 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver if (oldCoproc && verifyType == IndexTool.IndexVerifyType.ONLY) { return new IndexerRegionScanner(innerScanner, region, scan, env, this); } + RegionScanner scanner; if (!scan.isRaw()) { Scan rawScan = new Scan(scan); rawScan.setRaw(true); rawScan.setMaxVersions(); rawScan.getFamilyMap().clear(); - // For rebuilds we use count (*) as query for regular tables which ends up setting the FKOF on scan - // This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col - // For rebuilds we need all columns and all versions - if (scan.getFilter() instanceof FirstKeyOnlyFilter) { - rawScan.setFilter(null); - } else if (scan.getFilter() != null) { - // Override the filter so that we get all versions - rawScan.setFilter(new AllVersionsIndexRebuildFilter(scan.getFilter())); - } + adjustScanFilter(rawScan); rawScan.setCacheBlocks(false); for (byte[] family : scan.getFamilyMap().keySet()) { rawScan.addFamily(family); } + scanner = ((BaseRegionScanner)innerScanner).getNewRegionScanner(rawScan); innerScanner.close(); - RegionScanner scanner = region.getScanner(rawScan); - return getRegionScanner(scanner, region, scan, env, oldCoproc); + } else { + if (adjustScanFilter(scan)) { + scanner = ((BaseRegionScanner) innerScanner).getNewRegionScanner(scan); + innerScanner.close(); + } else { + scanner = innerScanner; + } } - return getRegionScanner(innerScanner, region, scan, env, oldCoproc); + return getRegionScanner(scanner, region, scan, env, oldCoproc); } private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java index a96303a..fcee172 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java @@ -32,9 +32,10 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB; -import static org.apache.phoenix.query.QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS; import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone; import static org.apache.phoenix.util.WALAnnotationUtil.annotateMutation; +import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner; +import static org.apache.phoenix.util.ScanUtil.isDummy; import java.io.IOException; import java.sql.SQLException; @@ -118,7 +119,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { private static final Logger LOGGER = LoggerFactory.getLogger(UngroupedAggregateRegionScanner.class); - private long pageSizeInMs = Long.MAX_VALUE; + private long pageSizeMs; private int maxBatchSize = 0; private final Scan scan; private final RegionScanner innerScanner; @@ -169,17 +170,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver; this.innerScanner = innerScanner; Configuration conf = env.getConfiguration(); - if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != null) { - byte[] pageSizeFromScan = - scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_SIZE_IN_MS); - if (pageSizeFromScan != null) { - pageSizeInMs = Bytes.toLong(pageSizeFromScan); - } else { - pageSizeInMs = - conf.getLong(UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, - QueryServicesOptions.DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS); - } - } + pageSizeMs = getPageSizeMsForRegionScanner(scan); ts = scan.getTimeRange().getMax(); boolean localIndexScan = ScanUtil.isLocalIndex(scan); @@ -566,6 +557,13 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { // since this is an indication of whether or not there are more values after the // ones returned hasMore = innerScanner.nextRaw(results); + if (isDummy(results)) { + if (!hasAny) { + resultsToReturn.addAll(results); + return true; + } + break; + } if (!results.isEmpty()) { lastCell = results.get(0); result.setKeyValues(results); @@ -606,8 +604,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { aggregators.aggregate(rowAggregators, result); hasAny = true; } - } while (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeInMs); - + } while (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeMs); if (!mutations.isEmpty()) { annotateAndCommit(mutations); } @@ -621,7 +618,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { } catch (DataExceedsCapacityException e) { throw new DoNotRetryIOException(e.getMessage(), e); } catch (Throwable e) { - LOGGER.error("Exception in UngroupedAggreagteRegionScanner for region " + LOGGER.error("Exception in UngroupedAggregateRegionScanner for region " + region.getRegionInfo().getRegionNameAsString(), e); throw e; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/DelegateFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/DelegateFilter.java index 2402e62..0f7a190 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/DelegateFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/DelegateFilter.java @@ -97,4 +97,15 @@ public class DelegateFilter extends FilterBase { public byte[] toByteArray() throws IOException { return delegate.toByteArray(); } + + @Override + public void setReversed(boolean reversed) { + delegate.setReversed(reversed); + } + + @Override + public boolean isReversed() { + return delegate.isReversed(); + } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java new file mode 100644 index 0000000..e46cfa0 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.filter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.Writable; +import org.apache.phoenix.util.EnvironmentEdgeManager; + +/** + * This filter overrides the behavior of delegate so that we do not scan more rows than pageSizeInRows . + */ +public class PagedFilter extends FilterBase implements Writable { + private enum State { + INITIAL, STARTED, TIME_TO_STOP, STOPPED + } + State state; + private long pageSizeMs; + private long startTime; + private byte[] rowKeyAtStop; + private Filter delegate = null; + + public PagedFilter() { + init(); + } + + public PagedFilter(Filter delegate, long pageSizeMs) { + init(); + this.delegate = delegate; + this.pageSizeMs = pageSizeMs; + } + + public Filter getDelegateFilter() { + return delegate; + } + + public void setDelegateFilter (Filter delegate) { + this.delegate = delegate; + } + + public byte[] getRowKeyAtStop() { + if (rowKeyAtStop != null) { + return Arrays.copyOf(rowKeyAtStop, rowKeyAtStop.length); + } + return null; + } + + public boolean isStopped() { + return state == State.STOPPED; + } + + public void init() { + state = State.INITIAL; + rowKeyAtStop = null; + } + + public void resetStartTime() { + if (state == State.STARTED) { + init(); + } + } + + @Override + public void reset() throws IOException { + if (state == State.INITIAL) { + startTime = EnvironmentEdgeManager.currentTimeMillis(); + state = State.STARTED; + } else if (state == State.STARTED && EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) { + state = State.TIME_TO_STOP; + } + if (delegate != null) { + delegate.reset(); + return; + } + super.reset(); + } + + @Override + public Cell getNextCellHint(Cell currentKV) throws IOException { + if (delegate != null) { + return delegate.getNextCellHint(currentKV); + } + return super.getNextCellHint(currentKV); + } + + public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { + if (state == State.TIME_TO_STOP) { + if (rowKeyAtStop == null) { + rowKeyAtStop = new byte[length]; + Bytes.putBytes(rowKeyAtStop, 0, buffer, offset, length); + } + return true; + } + if (delegate != null) { + return delegate.filterRowKey(buffer, offset, length); + } + return super.filterRowKey(buffer, offset, length); + } + + @Override + public boolean filterAllRemaining() throws IOException { + if (state == State.TIME_TO_STOP && rowKeyAtStop != null) { + state = State.STOPPED; + return true; + } + if (delegate != null) { + return delegate.filterAllRemaining(); + } + return super.filterAllRemaining(); + } + + @Override + public boolean hasFilterRow() { + return true; + } + + @Override + public boolean filterRow() throws IOException { + if (state == State.TIME_TO_STOP) { + return true; + } + if (delegate != null) { + return delegate.filterRow(); + } + return super.filterRow(); + } + + @Override + public Cell transformCell(Cell v) throws IOException { + if (delegate != null) { + return delegate.transformCell(v); + } + return super.transformCell(v); + } + + @Override + public void filterRowCells(List<Cell> kvs) throws IOException { + if (delegate != null) { + delegate.filterRowCells(kvs); + return; + } + super.filterRowCells(kvs); + } + + @Override + public void setReversed(boolean reversed) { + if (delegate != null) { + delegate.setReversed(reversed); + } + super.setReversed(reversed); + } + + @Override + public boolean isReversed() { + if (delegate != null) { + return delegate.isReversed(); + } + return super.isReversed(); + } + + @Override + public boolean isFamilyEssential(byte[] name) throws IOException { + if (delegate != null) { + return delegate.isFamilyEssential(name); + } + return super.isFamilyEssential(name); + } + + @Override + public ReturnCode filterKeyValue(Cell v) throws IOException { + if (delegate != null) { + return delegate.filterKeyValue(v); + } + return ReturnCode.INCLUDE; + } + + public static PagedFilter parseFrom(final byte [] pbBytes) throws DeserializationException { + try { + return (PagedFilter) Writables.getWritable(pbBytes, new PagedFilter()); + } catch (IOException e) { + throw new DeserializationException(e); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(pageSizeMs); + if (delegate != null) { + out.writeUTF(delegate.getClass().getName()); + byte[] b = delegate.toByteArray(); + out.writeInt(b.length); + out.write(b); + } else { + out.writeUTF(""); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + pageSizeMs = in.readLong(); + String className = in.readUTF(); + if (className.length() == 0) { + return; + } + Class cls = null; + try { + cls = Class.forName(className); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + throw new DoNotRetryIOException(e); + } + + Method m = null; + try { + m = cls.getDeclaredMethod("parseFrom", byte[].class); + } catch (NoSuchMethodException e) { + e.printStackTrace(); + throw new DoNotRetryIOException(e); + } + int length = in.readInt(); + byte[] b = new byte[length]; + in.readFully(b); + try { + delegate = (Filter) m.invoke(null, b); + } catch (IllegalAccessException e) { + e.printStackTrace(); + throw new DoNotRetryIOException(e); + } catch (InvocationTargetException e) { + e.printStackTrace(); + throw new DoNotRetryIOException(e); + } + } + + @Override + public byte[] toByteArray() throws IOException { + return Writables.getBytes(this); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java index 12592c0..7720ae6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java @@ -26,8 +26,12 @@ import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_ import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES; import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer; import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; +import static org.apache.phoenix.util.ScanUtil.getDummyResult; +import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner; +import static org.apache.phoenix.util.ScanUtil.isDummy; import java.io.IOException; +import java.sql.SQLException; import java.util.Iterator; import java.util.List; @@ -43,7 +47,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.Filter; @@ -54,7 +57,9 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.BaseRegionScanner; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.filter.PagedFilter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; @@ -94,7 +99,7 @@ import org.slf4j.LoggerFactory; * the verified version that is masked by the unverified version(s). * */ -public class GlobalIndexChecker extends BaseRegionObserver { +public class GlobalIndexChecker extends BaseScannerRegionObserver { private static final Logger LOG = LoggerFactory.getLogger(GlobalIndexChecker.class); private HTableFactory hTableFactory; private GlobalIndexCheckerSource metricsSource; @@ -119,13 +124,11 @@ public class GlobalIndexChecker extends BaseRegionObserver { * An instance of this class is created for each scanner on an index * and used to verify individual rows and rebuild them if they are not valid */ - private class GlobalIndexScanner implements RegionScanner { + private class GlobalIndexScanner extends BaseRegionScanner { private RegionScanner scanner; - private RegionScanner deleteRowScanner; private long ageThreshold; private Scan scan; private Scan indexScan; - private Scan deleteRowScan; private Scan singleRowIndexScan; private Scan buildIndexScan = null; private Table dataHTable = null; @@ -143,11 +146,13 @@ public class GlobalIndexChecker extends BaseRegionObserver { private boolean restartScanDueToPageFilterRemoval = false; private boolean hasMore; private String indexName; + private long pageSizeMs; public GlobalIndexScanner(RegionCoprocessorEnvironment env, Scan scan, RegionScanner scanner, GlobalIndexCheckerSource metricsSource) throws IOException { + super(scanner); this.env = env; this.scan = scan; this.scanner = scanner; @@ -171,6 +176,7 @@ public class GlobalIndexChecker extends BaseRegionObserver { "repairIndexRows: IndexMaintainer is not included in scan attributes for " + region.getRegionInfo().getTable().getNameAsString()); } + pageSizeMs = getPageSizeMsForRegionScanner(scan); } @Override @@ -185,6 +191,7 @@ public class GlobalIndexChecker extends BaseRegionObserver { public boolean next(List<Cell> result, boolean raw) throws IOException { try { + long startTime = EnvironmentEdgeManager.currentTimeMillis(); do { if (raw) { hasMore = scanner.nextRaw(result); @@ -194,9 +201,19 @@ public class GlobalIndexChecker extends BaseRegionObserver { if (result.isEmpty()) { break; } + if (isDummy(result)) { + return true; + } + Cell cell = result.get(0); if (verifyRowAndRepairIfNecessary(result)) { break; } + if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) { + byte[] rowKey = CellUtil.cloneRow(cell); + result.clear(); + getDummyResult(rowKey, result); + return true; + } // skip this row as it is invalid // if there is no more row, then result will be an empty list } while (hasMore); @@ -245,11 +262,6 @@ public class GlobalIndexChecker extends BaseRegionObserver { } @Override - public boolean isFilterDone() throws IOException { - return scanner.isFilterDone(); - } - - @Override public boolean reseek(byte[] row) throws IOException { return scanner.reseek(row); } @@ -297,6 +309,12 @@ public class GlobalIndexChecker extends BaseRegionObserver { private PageFilter removePageFilter(Scan scan) { Filter filter = scan.getFilter(); if (filter != null) { + if (filter instanceof PagedFilter) { + filter = ((PagedFilter) filter).getDelegateFilter(); + if (filter == null) { + return null; + } + } if (filter instanceof PageFilter) { scan.setFilter(null); return (PageFilter) filter; @@ -316,7 +334,6 @@ public class GlobalIndexChecker extends BaseRegionObserver { } buildIndexScan = new Scan(); indexScan = new Scan(scan); - deleteRowScan = new Scan(); singleRowIndexScan = new Scan(scan); byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME); dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName)); @@ -361,7 +378,7 @@ public class GlobalIndexChecker extends BaseRegionObserver { if (restartScanDueToPageFilterRemoval) { scanner.close(); setStartRow(indexScan, indexRowKey, false); - scanner = region.getScanner(indexScan); + scanner = ((BaseRegionScanner)delegate).getNewRegionScanner(indexScan); hasMore = true; // Set restartScanDueToPageFilterRemoval to false as we do not restart the scan unnecessarily next time restartScanDueToPageFilterRemoval = false; @@ -379,7 +396,7 @@ public class GlobalIndexChecker extends BaseRegionObserver { deleteRowIfAgedEnough(indexRowKey, ts, false); // Open a new scanner starting from the row after the current row setStartRow(indexScan, indexRowKey, false); - scanner = region.getScanner(indexScan); + scanner = ((BaseRegionScanner)delegate).getNewRegionScanner(indexScan); hasMore = true; // Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is // sufficient to do that @@ -389,12 +406,15 @@ public class GlobalIndexChecker extends BaseRegionObserver { // code == RebuildReturnCode.INDEX_ROW_EXISTS.getValue() // Open a new scanner starting from the current row setStartRow(indexScan, indexRowKey, true); - scanner = region.getScanner(indexScan); + scanner = ((BaseRegionScanner)delegate).getNewRegionScanner(indexScan); hasMore = scanner.next(row); if (row.isEmpty()) { // This means the index row has been deleted before opening the new scanner. return; } + if (isDummy(row)) { + return; + } // Check if the index row still exist after rebuild if (Bytes.compareTo(row.get(0).getRowArray(), row.get(0).getRowOffset(), row.get(0).getRowLength(), indexRowKey, 0, indexRowKey.length) != 0) { @@ -406,7 +426,7 @@ public class GlobalIndexChecker extends BaseRegionObserver { // The row is "unverified". Rewind the scanner and let the row be scanned again // so that it can be repaired scanner.close(); - scanner = region.getScanner(indexScan); + scanner =((BaseRegionScanner)delegate).getNewRegionScanner(indexScan); hasMore = true; row.clear(); return; @@ -430,7 +450,7 @@ public class GlobalIndexChecker extends BaseRegionObserver { // can be 1. In that case, we will get only one (i.e., the most recent) version instead of all versions setSingleRow(singleRowIndexScan, indexRowKey); singleRowIndexScan.setTimeRange(minTimestamp, ts); - RegionScanner singleRowScanner = region.getScanner(singleRowIndexScan); + RegionScanner singleRowScanner = ((BaseRegionScanner)delegate).getNewRegionScanner(singleRowIndexScan); row.clear(); singleRowScanner.next(row); singleRowScanner.close(); @@ -442,6 +462,9 @@ public class GlobalIndexChecker extends BaseRegionObserver { // possibly by compaction return; } + if (isDummy(row)) { + return; + } if (verifyRowAndRemoveEmptyColumn(row)) { // The index row status is "verified". This row is good to return to the client. We are done here. return; @@ -587,11 +610,13 @@ public class GlobalIndexChecker extends BaseRegionObserver { } @Override - public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, - Scan scan, RegionScanner s) throws IOException { - if (scan.getAttribute(CHECK_VERIFY_COLUMN) == null) { - return s; - } + protected boolean isRegionObserverFor(Scan scan) { + return scan.getAttribute(CHECK_VERIFY_COLUMN) != null; + } + + @Override + protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, + final RegionScanner s) throws IOException, SQLException { return new GlobalIndexScanner(c.getEnvironment(), scan, s, metricsSource); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java index 26114dd..d54a785 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java @@ -19,6 +19,9 @@ package org.apache.phoenix.iterate; import static org.apache.phoenix.util.EncodedColumnsUtil.getMinMaxQualifiersFromScan; +import static org.apache.phoenix.util.ScanUtil.getDummyResult; +import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner; +import static org.apache.phoenix.util.ScanUtil.isDummy; import java.io.ByteArrayInputStream; import java.io.DataInputStream; @@ -157,13 +160,14 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan); if (j != null) { - innerScanner = new HashJoinRegionScanner(env, innerScanner, arrayKVRefs, arrayFuncRefs, + innerScanner = new HashJoinRegionScanner(env, innerScanner, scan, arrayKVRefs, arrayFuncRefs, p, j, tenantId, useQualifierAsIndex, useNewValueColumnQualifier); } if (scanOffset != null) { innerScanner = getOffsetScanner(innerScanner, new OffsetResultIterator( - new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme), scanOffset), + new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme), + scanOffset, getPageSizeMsForRegionScanner(scan)), scan.getAttribute(QueryConstants.LAST_SCAN) != null); } boolean spoolingEnabled = @@ -219,7 +223,7 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); ResultIterator inner = new RegionScannerResultIterator(s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme); return new OrderedResultIterator(inner, orderByExpressions, spoolingEnabled, - thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize); + thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize, getPageSizeMsForRegionScanner(scan)); } catch (IOException e) { throw new RuntimeException(e); } finally { @@ -373,11 +377,13 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { if (isFilterDone()) { return false; } - - for (int i = 0; i < tuple.size(); i++) { - results.add(tuple.getValue(i)); + if (isDummy(tuple)) { + getDummyResult(results); + } else { + for (int i = 0; i < tuple.size(); i++) { + results.add(tuple.getValue(i)); + } } - tuple = iterator.next(); return !isFilterDone(); } catch (Throwable t) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java index 5c5a6d3..3eecfc8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java @@ -23,6 +23,10 @@ import java.util.List; import org.apache.phoenix.compile.ExplainPlanAttributes .ExplainPlanAttributesBuilder; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.EnvironmentEdgeManager; + +import static org.apache.phoenix.util.ScanUtil.getDummyResult; +import static org.apache.phoenix.util.ScanUtil.getDummyTuple; /** * Iterates through tuples up to a limit @@ -32,17 +36,27 @@ import org.apache.phoenix.schema.tuple.Tuple; public class OffsetResultIterator extends DelegateResultIterator { private int rowCount; private int offset; + private long pageSizeMs = Long.MAX_VALUE; public OffsetResultIterator(ResultIterator delegate, Integer offset) { super(delegate); this.offset = offset == null ? -1 : offset; } + public OffsetResultIterator(ResultIterator delegate, Integer offset, long pageSizeMs) { + this(delegate, offset); + this.pageSizeMs = pageSizeMs; + } @Override public Tuple next() throws SQLException { + long startTime = EnvironmentEdgeManager.currentTimeMillis(); while (rowCount < offset) { - if (super.next() == null) { return null; } + Tuple tuple = super.next(); + if (tuple == null) { return null; } rowCount++; + if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) { + return getDummyTuple(tuple); + } } return super.next(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java index a433759..13d75ea 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java @@ -19,13 +19,14 @@ package org.apache.phoenix.iterate; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkPositionIndex; +import static org.apache.phoenix.util.ScanUtil.getDummyTuple; +import static org.apache.phoenix.util.ScanUtil.isDummy; import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.Queue; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; @@ -36,9 +37,9 @@ import org.apache.phoenix.compile.ExplainPlanAttributes import org.apache.phoenix.execute.DescVarLengthFastByteComparisons; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.OrderByExpression; -import org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.SizedUtil; @@ -145,7 +146,10 @@ public class OrderedResultIterator implements PeekingResultIterator { private final long estimatedByteSize; private PeekingResultIterator resultIterator; + private boolean resultIteratorReady = false; + private Tuple dummyTuple = null; private long byteSize; + private long pageSizeMs; protected ResultIterator getDelegate() { return delegate; @@ -153,7 +157,7 @@ public class OrderedResultIterator implements PeekingResultIterator { public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions, boolean spoolingEnabled, long thresholdBytes, Integer limit, Integer offset) { - this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, limit, offset, 0); + this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, limit, offset, 0, Long.MAX_VALUE); } public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions, @@ -162,8 +166,14 @@ public class OrderedResultIterator implements PeekingResultIterator { } public OrderedResultIterator(ResultIterator delegate, + List<OrderByExpression> orderByExpressions, boolean spoolingEnabled, + long thresholdBytes, Integer limit, Integer offset, int estimatedRowSize) { + this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, limit, offset, estimatedRowSize, Long.MAX_VALUE); + } + + public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions, boolean spoolingEnabled, - long thresholdBytes, Integer limit, Integer offset, int estimatedRowSize) { + long thresholdBytes, Integer limit, Integer offset, int estimatedRowSize, long pageSizeMs) { checkArgument(!orderByExpressions.isEmpty()); this.delegate = delegate; this.orderByExpressions = orderByExpressions; @@ -187,6 +197,7 @@ public class OrderedResultIterator implements PeekingResultIterator { assert(limit == null || Long.MAX_VALUE / estimatedEntrySize >= limit + this.offset); this.estimatedByteSize = limit == null ? 0 : (limit + this.offset) * estimatedEntrySize; + this.pageSizeMs = pageSizeMs; } public Integer getLimit() { @@ -244,11 +255,17 @@ public class OrderedResultIterator implements PeekingResultIterator { @Override public Tuple next() throws SQLException { - return getResultIterator().next(); + getResultIterator(); + if (!resultIteratorReady) { + return dummyTuple; + } + return resultIterator.next(); } private PeekingResultIterator getResultIterator() throws SQLException { - if (resultIterator != null) { + if (resultIteratorReady) { + // The results have not been ordered yet. When the results are ordered then the result iterator + // will be ready to iterate over them return resultIterator; } @@ -256,11 +273,17 @@ public class OrderedResultIterator implements PeekingResultIterator { List<Expression> expressions = Lists.newArrayList(Collections2.transform(orderByExpressions, TO_EXPRESSION)); final Comparator<ResultEntry> comparator = buildComparator(orderByExpressions); try{ - final SizeAwareQueue<ResultEntry> queueEntries = - PhoenixQueues.newResultEntrySortedQueue(comparator, limit, spoolingEnabled, - thresholdBytes); - resultIterator = new RecordPeekingResultIterator(queueEntries); + if (resultIterator == null) { + resultIterator = new RecordPeekingResultIterator(PhoenixQueues.newResultEntrySortedQueue(comparator, + limit, spoolingEnabled, thresholdBytes)); + } + final SizeAwareQueue<ResultEntry> queueEntries = ((RecordPeekingResultIterator)resultIterator).getQueueEntries(); + long startTime = EnvironmentEdgeManager.currentTimeMillis(); for (Tuple result = delegate.next(); result != null; result = delegate.next()) { + if (isDummy(result)) { + dummyTuple = result; + return resultIterator; + } int pos = 0; ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[numSortKeys]; for (Expression expression : expressions) { @@ -270,7 +293,12 @@ public class OrderedResultIterator implements PeekingResultIterator { sortKeys[pos++] = evaluated && sortKey.getLength() > 0 ? sortKey : null; } queueEntries.add(new ResultEntry(sortKeys, result)); + if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) { + dummyTuple = getDummyTuple(result); + return resultIterator; + } } + resultIteratorReady = true; this.byteSize = queueEntries.getByteSize(); } catch (IOException e) { ServerUtil.createIOException(e.getMessage(), e); @@ -336,6 +364,10 @@ public class OrderedResultIterator implements PeekingResultIterator { this.queueEntries = queueEntries; } + public SizeAwareQueue<ResultEntry> getQueueEntries() { + return queueEntries; + } + @Override public Tuple next() throws SQLException { ResultEntry entry = queueEntries.poll(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java index c9d471b..5b587df 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java @@ -20,10 +20,16 @@ package org.apache.phoenix.iterate; import static org.apache.phoenix.coprocessor.ScanRegionObserver.WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS; import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; +import static org.apache.phoenix.util.ScanUtil.getDummyResult; +import org.apache.phoenix.util.EnvironmentEdgeManager; + +import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner; +import static org.apache.phoenix.util.ScanUtil.isDummy; import com.google.common.collect.ImmutableList; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; @@ -54,6 +60,7 @@ import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.EncodedColumnsUtil; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; @@ -118,6 +125,7 @@ public abstract class RegionScannerFactory { private HRegionInfo regionInfo = env.getRegionInfo(); private byte[] actualStartKey = getActualStartKey(); private boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); + final long pageSizeMs = getPageSizeMsForRegionScanner(scan); // Get the actual scan start row of local index. This will be used to compare the row // key of the results less than scan start row when there are references. @@ -129,7 +137,11 @@ public abstract class RegionScannerFactory { @Override public boolean next(List<Cell> results) throws IOException { try { - return s.next(results); + boolean next = s.next(results); + if (isDummy(results)) { + return true; + } + return next; } catch (Throwable t) { ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t); return false; // impossible @@ -170,6 +182,9 @@ public abstract class RegionScannerFactory { public boolean nextRaw(List<Cell> result) throws IOException { try { boolean next = s.nextRaw(result); + if (isDummy(result)) { + return true; + } Cell arrayElementCell = null; if (result.size() == 0) { return next; @@ -182,7 +197,7 @@ public abstract class RegionScannerFactory { if(actualStartKey!=null) { next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result, null, arrayElementCell); - if (result.isEmpty()) { + if (result.isEmpty() || isDummy(result)) { return next; } } @@ -290,8 +305,15 @@ public abstract class RegionScannerFactory { ScannerContext scannerContext, Cell arrayElementCell) throws IOException { boolean next = true; Cell firstCell = result.get(0); + long startTime = EnvironmentEdgeManager.currentTimeMillis(); while (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength(), actualStartKey, 0, actualStartKey.length) < 0) { + if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) { + byte[] rowKey = CellUtil.cloneRow(result.get(0)); + result.clear(); + getDummyResult(rowKey, result); + return true; + } result.clear(); if(scannerContext == null) { next = s.nextRaw(result); @@ -301,6 +323,9 @@ public abstract class RegionScannerFactory { if (result.isEmpty()) { return next; } + if (isDummy(result)) { + return true; + } if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) { int arrayElementCellPosition = replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result); arrayElementCell = result.get(arrayElementCellPosition); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java index a5a40e2..caf91c4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java @@ -23,16 +23,20 @@ import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ServerUtil; +import static org.apache.phoenix.util.ScanUtil.isDummy; + public class RegionScannerResultIterator extends BaseResultIterator { private final RegionScanner scanner; @@ -63,6 +67,9 @@ public class RegionScannerResultIterator extends BaseResultIterator { if (!hasMore && results.isEmpty()) { return null; } + if (isDummy(results)) { + return new ResultTuple(Result.create(results)); + } // We instantiate a new tuple because in all cases currently we hang on to it // (i.e. to compute and hold onto the TopN). Tuple tuple = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java index 4cafb88..46cbf96 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java @@ -160,7 +160,7 @@ public class ScanningResultIterator implements ResultIterator { public Tuple next() throws SQLException { try { Result result = scanner.next(); - while (result != null && isDummy(result)) { + while (result != null && (result.isEmpty() || isDummy(result))) { result = scanner.next(); } if (result == null) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 2ae9223..da5edbd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -26,7 +26,6 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NO import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED; -import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import java.io.IOException; import java.sql.SQLException; @@ -37,10 +36,10 @@ import java.util.concurrent.locks.ReentrantLock; import javax.annotation.concurrent.GuardedBy; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.AbstractClientScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; @@ -55,6 +54,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ByteUtil; @@ -139,7 +139,15 @@ public class TableResultIterator implements ResultIterator { .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES); ScanUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection()); ScanUtil.setScanAttributesForPhoenixTTL(scan, table, plan.getContext().getConnection()); - scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGING, TRUE_BYTES); + long pageSizeMs = plan.getContext().getConnection().getQueryServices().getProps() + .getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1); + if (pageSizeMs == -1) { + // Use the half of the HBase RPC timeout value as the the server page size to make sure that the HBase + // region server will be able to send a heartbeat message to the client before the client times out + pageSizeMs = (long) (plan.getContext().getConnection().getQueryServices().getProps() + .getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT) * 0.5); + } + scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, Bytes.toBytes(Long.valueOf(pageSizeMs))); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index dab3a96..faade11 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -330,11 +330,10 @@ public interface QueryServices extends SQLCloseable { public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = "phoenix.index.longViewIndex.enabled"; // The number of index rows to be rebuild in one RPC call public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows"; - // The number of rows to be scanned in one RPC call - public static final String UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.ungrouped.aggregate_page_size_in_ms"; - public static final String GROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.grouped.aggregate_page_size_in_ms"; // Flag indicating that server side masking of ttl expired rows is enabled. public static final String PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED = "phoenix.ttl.server_side.masking.enabled"; + // The time limit on the amount of work to be done in one RPC call + public static final String PHOENIX_SERVER_PAGE_SIZE_MS = "phoenix.server.page.size.ms"; // Before 4.15 when we created a view we included the parent table column metadata in the view // metadata. After PHOENIX-3534 we allow SYSTEM.CATALOG to split and no longer store the parent diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 289e3fc..9c6ff29 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -342,8 +342,6 @@ public class QueryServicesOptions { public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */ public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true; public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024; - public static final long DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = 1000; // 1 second - public static final long DEFAULT_GROUPED_AGGREGATE_PAGE_SIZE_IN_MS = 1000; public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false; public static final boolean DEFAULT_PROPERTY_POLICY_PROVIDER_ENABLED = true; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java index db3647d..54e97a3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java @@ -20,6 +20,7 @@ package org.apache.phoenix.schema.tuple; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE; import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME; +import static org.apache.phoenix.util.ScanUtil.isDummy; import java.util.Collection; import java.util.ConcurrentModificationException; @@ -137,6 +138,12 @@ public class EncodedColumnQualiferCellsList implements List<Cell> { if (e == null) { throw new NullPointerException(); } + if (isDummy(e)) { + array[0] = e; + firstNonNullElementIdx = 0; + numNonNullElements = 1; + return true; + } int columnQualifier = encodingScheme.decode(e.getQualifierArray(), e.getQualifierOffset(), e.getQualifierLength()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java index 3774837..cd17527 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java @@ -35,7 +35,6 @@ import org.apache.phoenix.util.KeyValueUtil; public class ResultTuple extends BaseTuple { private final Result result; public static final ResultTuple EMPTY_TUPLE = new ResultTuple(Result.create(Collections.<Cell>emptyList())); - public ResultTuple(Result result) { this.result = result; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index 28e58a0..a9c53e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -49,6 +49,7 @@ import org.apache.phoenix.filter.ColumnProjectionFilter; import org.apache.phoenix.filter.DistinctPrefixFilter; import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter; import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter; +import org.apache.phoenix.filter.PagedFilter; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.VersionUtil; @@ -70,6 +71,8 @@ import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.ValueSchema.Field; +import org.apache.phoenix.schema.tuple.ResultTuple; +import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PVarbinary; import org.slf4j.Logger; @@ -676,7 +679,7 @@ public class ScanUtil { // Start/stop row must be swapped if scan is being done in reverse public static void setupReverseScan(Scan scan) { - if (isReversed(scan)) { + if (isReversed(scan) && !scan.isReversed()) { byte[] newStartRow = getReversedRow(scan.getStartRow()); byte[] newStopRow = getReversedRow(scan.getStopRow()); scan.setStartRow(newStopRow); @@ -761,6 +764,12 @@ public class ScanUtil { if (filter == null) { return; } + if (filter instanceof PagedFilter) { + filter = ((PagedFilter) filter).getDelegateFilter(); + if (filter == null) { + return; + } + } if (filter instanceof FilterList) { FilterList filterList = (FilterList)filter; for (Filter childFilter : filterList.getFilters()) { @@ -1206,6 +1215,15 @@ public class ScanUtil { if (!ScanUtil.isDeleteTTLExpiredRows(scan)) { scan.setAttribute(BaseScannerRegionObserver.MASK_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES); } + if (ScanUtil.isLocalIndex(scan)) { + byte[] actualStartRow = scan.getAttribute(SCAN_ACTUAL_START_ROW) != null ? + scan.getAttribute(SCAN_ACTUAL_START_ROW) : + HConstants.EMPTY_BYTE_ARRAY; + ScanUtil.setLocalIndexAttributes(scan, 0, + actualStartRow, + HConstants.EMPTY_BYTE_ARRAY, + scan.getStartRow(), scan.getStopRow()); + } addEmptyColumnToScan(scan, emptyColumnFamilyName, emptyColumnName); } } @@ -1222,22 +1240,85 @@ public class ScanUtil { getDummyResult(EMPTY_BYTE_ARRAY, result); } + public static Tuple getDummyTuple(byte[] rowKey) { + List<Cell> result = new ArrayList<Cell>(1); + getDummyResult(rowKey, result); + return new ResultTuple(Result.create(result)); + } + + public static Tuple getDummyTuple() { + List<Cell> result = new ArrayList<Cell>(1); + getDummyResult(result); + return new ResultTuple(Result.create(result)); + } + + public static Tuple getDummyTuple(Tuple tuple) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + tuple.getKey(ptr); + return getDummyTuple(ptr.copyBytes()); + } + + public static boolean isDummy(Cell cell) { + return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); + } + public static boolean isDummy(Result result) { - // Check if the result is a dummy result if (result.rawCells().length != 1) { return false; } Cell cell = result.rawCells()[0]; - return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); + return isDummy(cell); } public static boolean isDummy(List<Cell> result) { - // Check if the result is a dummy result if (result.size() != 1) { return false; } Cell cell = result.get(0); - return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); + return isDummy(cell); + } + + public static boolean isDummy(Tuple tuple) { + if (tuple instanceof ResultTuple) { + isDummy(((ResultTuple) tuple).getResult()); + } + return false; + } + + public static PagedFilter getPhoenixPagedFilter(Scan scan) { + Filter filter = scan.getFilter(); + if (filter != null && filter instanceof PagedFilter) { + PagedFilter pageFilter = (PagedFilter) filter; + return pageFilter; + } + return null; + } + + /** + * + * The server page size expressed in ms is the maximum time we want the Phoenix server code to spend + * for each iteration of ResultScanner. For each ResultScanner#next() can be translated into one or more + * HBase RegionScanner#next() calls by a Phoenix RegionScanner object in a loop. To ensure that the total + * time spent by the Phoenix server code will not exceed the configured page size value, SERVER_PAGE_SIZE_MS, + * the loop time in a Phoenix region scanner is limited by 0.6 * SERVER_PAGE_SIZE_MS and + * each HBase RegionScanner#next() time which is controlled by PagedFilter is set to 0.3 * SERVER_PAGE_SIZE_MS. + * + */ + private static long getPageSizeMs(Scan scan) { + long pageSizeMs = Long.MAX_VALUE; + byte[] pageSizeMsBytes = scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS); + if (pageSizeMsBytes != null) { + pageSizeMs = Bytes.toLong(pageSizeMsBytes); + } + return pageSizeMs; + } + + public static long getPageSizeMsForRegionScanner(Scan scan) { + return (long) (getPageSizeMs(scan) * 0.6); + } + + public static long getPageSizeMsForFilter(Scan scan) { + return (long) (getPageSizeMs(scan) * 0.3); } /** diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 05773fb..f326f9b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -638,11 +638,8 @@ public abstract class BaseTest { // This results in processing one row at a time in each next operation of the aggregate region // scanner, i.e., one row pages. In other words, 0ms page allows only one row to be processed // within one page; 0ms page is equivalent to one-row page - if (conf.getLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0) == 0) { - conf.setLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0); - } - if (conf.getLong(QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0) == 0) { - conf.setLong(QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0); + if (conf.getLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 0) == 0) { + conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 0); } return conf; }