PHOENIX-1672 RegionScanner.nextRaw contract not implemented correctly
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8d014cba Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8d014cba Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8d014cba Branch: refs/heads/4.0 Commit: 8d014cbafe2b397c3cca5084abf80be759504e77 Parents: 221ff6b Author: Andrew Purtell <apurt...@apache.org> Authored: Sat Feb 21 20:34:08 2015 -0800 Committer: Andrew Purtell <apurt...@apache.org> Committed: Sat Feb 21 20:34:21 2015 -0800 ---------------------------------------------------------------------- .../GroupedAggregateRegionObserver.java | 96 ++++---- .../UngroupedAggregateRegionObserver.java | 246 ++++++++++--------- .../phoenix/index/PhoenixIndexBuilder.java | 18 +- .../iterate/RegionScannerResultIterator.java | 36 +-- 4 files changed, 216 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d014cba/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 8b59b85..0984b06 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -375,7 +375,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { * @param limit TODO */ private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, - final RegionScanner s, final List<Expression> expressions, + final RegionScanner scanner, final List<Expression> expressions, final ServerAggregators aggregators, long limit) throws IOException { if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan @@ -410,28 +410,30 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { HRegion region = c.getEnvironment().getRegion(); region.startRegionOperation(); try { - do { - List<Cell> results = new ArrayList<Cell>(); - // Results are potentially returned even when the return - // value of s.next is false - // since this is an indication of whether or not there are - // more values after the - // ones returned - hasMore = s.nextRaw(results); - if (!results.isEmpty()) { - result.setKeyValues(results); - ImmutableBytesWritable key = + synchronized (scanner) { + do { + List<Cell> results = new ArrayList<Cell>(); + // Results are potentially returned even when the return + // value of s.next is false + // since this is an indication of whether or not there are + // more values after the + // ones returned + hasMore = scanner.nextRaw(results); + if (!results.isEmpty()) { + result.setKeyValues(results); + ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(result, expressions); - Aggregator[] rowAggregators = groupByCache.cache(key); - // Aggregate values here - aggregators.aggregate(rowAggregators, result); - } - } while (hasMore && groupByCache.size() < limit); + Aggregator[] rowAggregators = groupByCache.cache(key); + // Aggregate values here + aggregators.aggregate(rowAggregators, result); + } + } while (hasMore && groupByCache.size() < limit); + } } finally { region.closeRegionOperation(); } - RegionScanner regionScanner = groupByCache.getScanner(s); + RegionScanner regionScanner = groupByCache.getScanner(scanner); // Do not sort here, but sort back on the client instead // The reason is that if the scan ever extends beyond a region @@ -453,7 +455,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { * @throws IOException */ private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c, - final Scan scan, final RegionScanner s, final List<Expression> expressions, + final Scan scan, final RegionScanner scanner, final List<Expression> expressions, final ServerAggregators aggregators, final long limit) throws IOException { if (logger.isDebugEnabled()) { @@ -466,12 +468,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { @Override public HRegionInfo getRegionInfo() { - return s.getRegionInfo(); + return scanner.getRegionInfo(); } @Override public void close() throws IOException { - s.close(); + scanner.close(); } @Override @@ -488,32 +490,36 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { HRegion region = c.getEnvironment().getRegion(); region.startRegionOperation(); try { - do { - List<Cell> kvs = new ArrayList<Cell>(); - // Results are potentially returned even when the return - // value of s.next is false - // since this is an indication of whether or not there - // are more values after the - // ones returned - hasMore = s.nextRaw(kvs); - if (!kvs.isEmpty()) { - result.setKeyValues(kvs); - key = TupleUtil.getConcatenatedValue(result, expressions); - aggBoundary = currentKey != null && currentKey.compareTo(key) != 0; - if (!aggBoundary) { - aggregators.aggregate(rowAggregators, result); - if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("Row passed filters: " + kvs + synchronized (scanner) { + do { + List<Cell> kvs = new ArrayList<Cell>(); + // Results are potentially returned even when the return + // value of s.next is false + // since this is an indication of whether or not there + // are more values after the + // ones returned + hasMore = scanner.nextRaw(kvs); + if (!kvs.isEmpty()) { + result.setKeyValues(kvs); + key = TupleUtil.getConcatenatedValue(result, expressions); + aggBoundary = currentKey != null && currentKey.compareTo(key) != 0; + if (!aggBoundary) { + aggregators.aggregate(rowAggregators, result); + if (logger.isDebugEnabled()) { + logger.debug(LogUtil.addCustomAnnotations( + "Row passed filters: " + kvs + ", aggregated values: " - + Arrays.asList(rowAggregators), ScanUtil.getCustomAnnotations(scan))); + + Arrays.asList(rowAggregators), + ScanUtil.getCustomAnnotations(scan))); + } + currentKey = key; } - currentKey = key; } - } - atLimit = rowCount + countOffset >= limit; - // Do rowCount + 1 b/c we don't have to wait for a complete - // row in the case of a DISTINCT with a LIMIT - } while (hasMore && !aggBoundary && !atLimit); + atLimit = rowCount + countOffset >= limit; + // Do rowCount + 1 b/c we don't have to wait for a complete + // row in the case of a DISTINCT with a LIMIT + } while (hasMore && !aggBoundary && !atLimit); + } } finally { region.closeRegionOperation(); } @@ -555,7 +561,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { @Override public long getMaxResultSize() { - return s.getMaxResultSize(); + return scanner.getMaxResultSize(); } }; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d014cba/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index a3b2faa..71c4dc6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -253,128 +253,152 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ final RegionScanner innerScanner = theScanner; region.startRegionOperation(); try { - do { - List<Cell> results = new ArrayList<Cell>(); - // Results are potentially returned even when the return value of s.next is false - // since this is an indication of whether or not there are more values after the - // ones returned - hasMore = innerScanner.nextRaw(results); - if(stats != null) { - stats.collectStatistics(results); - } - - if (!results.isEmpty()) { - rowCount++; - result.setKeyValues(results); - try { - if (buildLocalIndex) { - for (IndexMaintainer maintainer : indexMaintainers) { - if (!results.isEmpty()) { - result.getKey(ptr); - ValueGetter valueGetter = maintainer.createGetterFromKeyValues(ImmutableBytesPtr.copyBytesIfNecessary(ptr),results); - Put put = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, c.getEnvironment().getRegion().getStartKey(), c.getEnvironment().getRegion().getEndKey()); - indexMutations.add(put); + synchronized (innerScanner) { + do { + List<Cell> results = new ArrayList<Cell>(); + // Results are potentially returned even when the return value of s.next is false + // since this is an indication of whether or not there are more values after the + // ones returned + hasMore = innerScanner.nextRaw(results); + if (stats != null) { + stats.collectStatistics(results); + } + if (!results.isEmpty()) { + rowCount++; + result.setKeyValues(results); + try { + if (buildLocalIndex) { + for (IndexMaintainer maintainer : indexMaintainers) { + if (!results.isEmpty()) { + result.getKey(ptr); + ValueGetter valueGetter = + maintainer.createGetterFromKeyValues( + ImmutableBytesPtr.copyBytesIfNecessary(ptr), + results); + Put put = maintainer.buildUpdateMutation(kvBuilder, + valueGetter, ptr, ts, + c.getEnvironment().getRegion().getStartKey(), + c.getEnvironment().getRegion().getEndKey()); + indexMutations.add(put); + } } - } - result.setKeyValues(results); - } else if (isDelete) { - // FIXME: the version of the Delete constructor without the lock args was introduced - // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version - // of the client. - Cell firstKV = results.get(0); - Delete delete = new Delete(firstKV.getRowArray(), firstKV.getRowOffset(), - firstKV.getRowLength(),ts); - mutations.add(delete); - } else if (isUpsert) { - Arrays.fill(values, null); - int i = 0; - List<PColumn> projectedColumns = projectedTable.getColumns(); - for (; i < projectedTable.getPKColumns().size(); i++) { - Expression expression = selectExpressions.get(i); - if (expression.evaluate(result, ptr)) { - values[i] = ptr.copyBytes(); - // If SortOrder from expression in SELECT doesn't match the - // column being projected into then invert the bits. - if (expression.getSortOrder() != projectedColumns.get(i).getSortOrder()) { - SortOrder.invert(values[i], 0, values[i], 0, values[i].length); + result.setKeyValues(results); + } else if (isDelete) { + // FIXME: the version of the Delete constructor without the lock + // args was introduced in 0.94.4, thus if we try to use it here + // we can no longer use the 0.94.2 version of the client. + Cell firstKV = results.get(0); + Delete delete = new Delete(firstKV.getRowArray(), + firstKV.getRowOffset(), firstKV.getRowLength(),ts); + mutations.add(delete); + } else if (isUpsert) { + Arrays.fill(values, null); + int i = 0; + List<PColumn> projectedColumns = projectedTable.getColumns(); + for (; i < projectedTable.getPKColumns().size(); i++) { + Expression expression = selectExpressions.get(i); + if (expression.evaluate(result, ptr)) { + values[i] = ptr.copyBytes(); + // If SortOrder from expression in SELECT doesn't match the + // column being projected into then invert the bits. + if (expression.getSortOrder() != + projectedColumns.get(i).getSortOrder()) { + SortOrder.invert(values[i], 0, values[i], 0, + values[i].length); + } } } - } - projectedTable.newKey(ptr, values); - PRow row = projectedTable.newRow(kvBuilder, ts, ptr); - for (; i < projectedColumns.size(); i++) { - Expression expression = selectExpressions.get(i); - if (expression.evaluate(result, ptr)) { - PColumn column = projectedColumns.get(i); - Object value = expression.getDataType().toObject(ptr, column.getSortOrder()); - // We are guaranteed that the two column will have the same type. - if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), - expression.getMaxLength(), expression.getScale(), - column.getMaxLength(), column.getScale())) { - throw new ValueTypeIncompatibleException(column.getDataType(), - column.getMaxLength(), column.getScale()); + projectedTable.newKey(ptr, values); + PRow row = projectedTable.newRow(kvBuilder, ts, ptr); + for (; i < projectedColumns.size(); i++) { + Expression expression = selectExpressions.get(i); + if (expression.evaluate(result, ptr)) { + PColumn column = projectedColumns.get(i); + Object value = expression.getDataType() + .toObject(ptr, column.getSortOrder()); + // We are guaranteed that the two column will have the + // same type. + if (!column.getDataType().isSizeCompatible(ptr, value, + column.getDataType(), expression.getMaxLength(), + expression.getScale(), column.getMaxLength(), + column.getScale())) { + throw new ValueTypeIncompatibleException( + column.getDataType(), column.getMaxLength(), + column.getScale()); + } + column.getDataType().coerceBytes(ptr, value, + expression.getDataType(), expression.getMaxLength(), + expression.getScale(), expression.getSortOrder(), + column.getMaxLength(), column.getScale(), + column.getSortOrder()); + byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr); + row.setValue(column, bytes); } - column.getDataType().coerceBytes(ptr, value, expression.getDataType(), - expression.getMaxLength(), expression.getScale(), expression.getSortOrder(), - column.getMaxLength(), column.getScale(), column.getSortOrder()); - byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr); - row.setValue(column, bytes); + } + for (Mutation mutation : row.toRowMutations()) { + mutations.add(mutation); + } + for (i = 0; i < selectExpressions.size(); i++) { + selectExpressions.get(i).reset(); + } + } else if (deleteCF != null && deleteCQ != null) { + // No need to search for delete column, since we project only it + // if no empty key value is being set + if (emptyCF == null || + result.getValue(deleteCF, deleteCQ) != null) { + Delete delete = new Delete(results.get(0).getRowArray(), + results.get(0).getRowOffset(), + results.get(0).getRowLength()); + delete.deleteColumns(deleteCF, deleteCQ, ts); + mutations.add(delete); } } - for (Mutation mutation : row.toRowMutations()) { - mutations.add(mutation); - } - for (i = 0; i < selectExpressions.size(); i++) { - selectExpressions.get(i).reset(); + if (emptyCF != null) { + /* + * If we've specified an emptyCF, then we need to insert an empty + * key value "retroactively" for any key value that is visible at + * the timestamp that the DDL was issued. Key values that are not + * visible at this timestamp will not ever be projected up to + * scans past this timestamp, so don't need to be considered. + * We insert one empty key value per row per timestamp. + */ + Set<Long> timeStamps = + Sets.newHashSetWithExpectedSize(results.size()); + for (Cell kv : results) { + long kvts = kv.getTimestamp(); + if (!timeStamps.contains(kvts)) { + Put put = new Put(kv.getRowArray(), kv.getRowOffset(), + kv.getRowLength()); + put.add(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts, + ByteUtil.EMPTY_BYTE_ARRAY); + mutations.add(put); + } + } } - } else if (deleteCF != null && deleteCQ != null) { - // No need to search for delete column, since we project only it - // if no empty key value is being set - if (emptyCF == null || result.getValue(deleteCF, deleteCQ) != null) { - Delete delete = new Delete(results.get(0).getRowArray(), results.get(0).getRowOffset(), - results.get(0).getRowLength()); - delete.deleteColumns(deleteCF, deleteCQ, ts); - mutations.add(delete); + // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config + if (!mutations.isEmpty() && batchSize > 0 && + mutations.size() % batchSize == 0) { + commitBatch(region, mutations, indexUUID); + mutations.clear(); } - } - if (emptyCF != null) { - /* - * If we've specified an emptyCF, then we need to insert an empty - * key value "retroactively" for any key value that is visible at - * the timestamp that the DDL was issued. Key values that are not - * visible at this timestamp will not ever be projected up to - * scans past this timestamp, so don't need to be considered. - * We insert one empty key value per row per timestamp. - */ - Set<Long> timeStamps = Sets.newHashSetWithExpectedSize(results.size()); - for (Cell kv : results) { - long kvts = kv.getTimestamp(); - if (!timeStamps.contains(kvts)) { - Put put = new Put(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()); - put.add(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts, ByteUtil.EMPTY_BYTE_ARRAY); - mutations.add(put); - } + // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config + if (!indexMutations.isEmpty() && batchSize > 0 && + indexMutations.size() % batchSize == 0) { + commitIndexMutations(c, region, indexMutations); } + } catch (ConstraintViolationException e) { + // Log and ignore in count + logger.error(LogUtil.addCustomAnnotations("Failed to create row in " + + region.getRegionNameAsString() + " with values " + + SchemaUtil.toString(values), + ScanUtil.getCustomAnnotations(scan)), e); + continue; } - // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config - if (!mutations.isEmpty() && batchSize > 0 && mutations.size() % batchSize == 0) { - commitBatch(region, mutations, indexUUID); - mutations.clear(); - } - // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config - if (!indexMutations.isEmpty() && batchSize > 0 && indexMutations.size() % batchSize == 0) { - commitIndexMutations(c, region, indexMutations); - } - - } catch (ConstraintViolationException e) { - // Log and ignore in count - logger.error(LogUtil.addCustomAnnotations("Failed to create row in " + region.getRegionNameAsString() + " with values " + SchemaUtil.toString(values), ScanUtil.getCustomAnnotations(scan)), e); - continue; + aggregators.aggregate(rowAggregators, result); + hasAny = true; } - aggregators.aggregate(rowAggregators, result); - hasAny = true; - } - } while (hasMore); + } while (hasMore); + } } finally { try { if (stats != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d014cba/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index a0bd7c5..b89c807 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -78,14 +78,16 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder { // Run through the scanner using internal nextRaw method region.startRegionOperation(); try { - boolean hasMore; - do { - List<Cell> results = Lists.newArrayList(); - // Results are potentially returned even when the return value of s.next is false - // since this is an indication of whether or not there are more values after the - // ones returned - hasMore = scanner.nextRaw(results); - } while (hasMore); + synchronized (scanner) { + boolean hasMore; + do { + List<Cell> results = Lists.newArrayList(); + // Results are potentially returned even when the return value of s.next is + // false since this is an indication of whether or not there are more values + // after the ones returned + hasMore = scanner.nextRaw(results); + } while (hasMore); + } } finally { try { scanner.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d014cba/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java index bff0936..88e141a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java @@ -38,23 +38,27 @@ public class RegionScannerResultIterator extends BaseResultIterator { @Override public Tuple next() throws SQLException { - try { - // TODO: size - List<Cell> results = new ArrayList<Cell>(); - // Results are potentially returned even when the return value of s.next is false - // since this is an indication of whether or not there are more values after the - // ones returned - boolean hasMore = scanner.nextRaw(results); - if (!hasMore && results.isEmpty()) { - return null; + // XXX: No access here to the region instance to enclose this with startRegionOperation / + // stopRegionOperation + synchronized (scanner) { + try { + // TODO: size + List<Cell> results = new ArrayList<Cell>(); + // Results are potentially returned even when the return value of s.next is false + // since this is an indication of whether or not there are more values after the + // ones returned + boolean hasMore = scanner.nextRaw(results); + if (!hasMore && results.isEmpty()) { + return null; + } + // We instantiate a new tuple because in all cases currently we hang on to it + // (i.e. to compute and hold onto the TopN). + MultiKeyValueTuple tuple = new MultiKeyValueTuple(); + tuple.setKeyValues(results); + return tuple; + } catch (IOException e) { + throw ServerUtil.parseServerException(e); } - // We instantiate a new tuple because in all cases currently we hang on to it (i.e. - // to compute and hold onto the TopN). - MultiKeyValueTuple tuple = new MultiKeyValueTuple(); - tuple.setKeyValues(results); - return tuple; - } catch (IOException e) { - throw ServerUtil.parseServerException(e); } }