http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 7905d34..bde3f78 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -17,68 +17,23 @@ */ package org.apache.phoenix.iterate; -import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY; -import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; - import java.sql.SQLException; -import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableSet; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.QueryPlan; -import org.apache.phoenix.compile.RowProjector; -import org.apache.phoenix.compile.ScanRanges; -import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.filter.ColumnProjectionFilter; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.job.JobManager.JobCallable; -import org.apache.phoenix.parse.FilterableStatement; -import org.apache.phoenix.parse.HintNode.Hint; -import org.apache.phoenix.query.ConnectionQueryServices; -import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.MetaDataClient; -import org.apache.phoenix.schema.PColumnFamily; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTable.IndexType; -import org.apache.phoenix.schema.PTable.ViewType; -import org.apache.phoenix.schema.SaltingUtil; -import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; -import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.schema.stats.GuidePostsInfo; -import org.apache.phoenix.schema.stats.PTableStats; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.LogUtil; -import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; -import org.apache.phoenix.util.SchemaUtil; -import org.apache.phoenix.util.ServerUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -90,537 +45,43 @@ import com.google.common.collect.Lists; * * @since 0.1 */ -public class ParallelIterators extends ExplainTable implements ResultIterators { +public class ParallelIterators extends BaseResultIterators { private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class); - private final List<List<Scan>> scans; - private final List<KeyRange> splits; - private final PTableStats tableStats; - private final byte[] physicalTableName; - private final QueryPlan plan; + private static final String NAME = "PARALLEL"; private final ParallelIteratorFactory iteratorFactory; - public static interface ParallelIteratorFactory { - PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException; - } - - private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min - private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20; - - static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() { - @Override - public KeyRange apply(HRegionLocation region) { - return KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey()); - } - }; - - private PTable getTable() { - return plan.getTableRef().getTable(); - } - - private boolean useStats() { - Scan scan = context.getScan(); - boolean isPointLookup = context.getScanRanges().isPointLookup(); - /* - * Don't use guide posts if: - * 1) We're doing a point lookup, as HBase is fast enough at those - * to not need them to be further parallelized. TODO: pref test to verify - * 2) We're collecting stats, as in this case we need to scan entire - * regions worth of data to track where to put the guide posts. - */ - if (isPointLookup || ScanUtil.isAnalyzeTable(scan)) { - return false; - } - return true; - } - public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory) throws SQLException { - super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint()); - this.plan = plan; - StatementContext context = plan.getContext(); - TableRef tableRef = plan.getTableRef(); - PTable table = tableRef.getTable(); - FilterableStatement statement = plan.getStatement(); - RowProjector projector = plan.getProjector(); - physicalTableName = table.getPhysicalName().getBytes(); - tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS; - Scan scan = context.getScan(); - if (projector.isProjectEmptyKeyValue()) { - Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap(); - // If nothing projected into scan and we only have one column family, just allow everything - // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to - // be quite a bit faster. - // Where condition columns also will get added into familyMap - // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning. - if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty() - && table.getColumnFamilies().size() == 1) { - // Project the one column family. We must project a column family since it's possible - // that there are other non declared column families that we need to ignore. - scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes()); - ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter()); - } else { - byte[] ecf = SchemaUtil.getEmptyColumnFamily(table); - // Project empty key value unless the column family containing it has - // been projected in its entirety. - if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) { - scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES); - } - } - } else if (table.getViewType() == ViewType.MAPPED) { - // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the - // selected column values are returned back to client - for (PColumnFamily family : table.getColumnFamilies()) { - scan.addFamily(family.getName().getBytes()); - } - } - - // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization. - if (perScanLimit != null) { - ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit)); - } - - doColumnProjectionOptimization(context, scan, table, statement); - + super(plan, perScanLimit); this.iteratorFactory = iteratorFactory; - this.scans = getParallelScans(); - List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION); - for (List<Scan> scanList : scans) { - for (Scan aScan : scanList) { - splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow())); - } - } - this.splits = ImmutableList.copyOf(splitRanges); - } - - private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) { - Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap(); - if (familyMap != null && !familyMap.isEmpty()) { - // columnsTracker contain cf -> qualifiers which should get returned. - Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker = - new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>(); - Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); - int referencedCfCount = familyMap.size(); - for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) { - if (!(familyMap.containsKey(whereCol.getFirst()))) { - referencedCfCount++; - } - } - boolean useOptimization; - if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) { - // Do not use the optimization - useOptimization = false; - } else if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) { - // Strictly use the optimization - useOptimization = true; - } else { - // when referencedCfCount is >1 and no Hints, we are not using the optimization - useOptimization = referencedCfCount == 1; - } - if (useOptimization) { - for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) { - ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey()); - NavigableSet<byte[]> qs = entry.getValue(); - NavigableSet<ImmutableBytesPtr> cols = null; - if (qs != null) { - cols = new TreeSet<ImmutableBytesPtr>(); - for (byte[] q : qs) { - cols.add(new ImmutableBytesPtr(q)); - } - } - columnsTracker.put(cf, cols); - } - } - // Making sure that where condition CFs are getting scanned at HRS. - for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) { - if (useOptimization) { - if (!(familyMap.containsKey(whereCol.getFirst()))) { - scan.addFamily(whereCol.getFirst()); - conditionOnlyCfs.add(whereCol.getFirst()); - } - } else { - if (familyMap.containsKey(whereCol.getFirst())) { - // where column's CF is present. If there are some specific columns added against this CF, we - // need to ensure this where column also getting added in it. - // If the select was like select cf1.*, then that itself will select the whole CF. So no need to - // specifically add the where column. Adding that will remove the cf1.* stuff and only this - // where condition column will get returned! - NavigableSet<byte[]> cols = familyMap.get(whereCol.getFirst()); - // cols is null means the whole CF will get scanned. - if (cols != null) { - scan.addColumn(whereCol.getFirst(), whereCol.getSecond()); - } - } else { - // where column's CF itself is not present in family map. We need to add the column - scan.addColumn(whereCol.getFirst(), whereCol.getSecond()); - } - } - } - if (useOptimization && !columnsTracker.isEmpty()) { - for (ImmutableBytesPtr f : columnsTracker.keySet()) { - // This addFamily will remove explicit cols in scan familyMap and make it as entire row. - // We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter - scan.addFamily(f.get()); - } - // We don't need this filter for aggregates, as we're not returning back what's - // in the scan in this case. We still want the other optimization that causes - // the ExplicitColumnTracker not to be used, though. - if (!(statement.isAggregate())) { - ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), - columnsTracker, conditionOnlyCfs)); - } - } - } - } - - public List<KeyRange> getSplits() { - return splits; - } - - public List<List<Scan>> getScans() { - return scans; - } - - private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) { - int nBoundaries = regionLocations.size() - 1; - List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries); - for (int i = 0; i < nBoundaries; i++) { - HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo(); - ranges.add(regionInfo.getEndKey()); - } - return ranges; - } - - private static int getIndexContainingInclusive(List<byte[]> boundaries, byte[] inclusiveKey) { - int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR); - // If we found an exact match, return the index+1, as the inclusiveKey will be contained - // in the next region (since we're matching on the end boundary). - guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1)); - return guideIndex; - } - - private static int getIndexContainingExclusive(List<byte[]> boundaries, byte[] exclusiveKey) { - int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR); - // If we found an exact match, return the index we found as the exclusiveKey won't be - // contained in the next region as with getIndexContainingInclusive. - guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex); - return guideIndex; - } - - private List<byte[]> getGuidePosts() { - /* - * Don't use guide posts if: - * 1) We're doing a point lookup, as HBase is fast enough at those - * to not need them to be further parallelized. TODO: pref test to verify - * 2) We're collecting stats, as in this case we need to scan entire - * regions worth of data to track where to put the guide posts. - */ - if (!useStats()) { - return Collections.emptyList(); - } - - List<byte[]> gps = null; - PTable table = getTable(); - Map<byte[],GuidePostsInfo> guidePostMap = tableStats.getGuidePosts(); - byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable()); - if (table.getColumnFamilies().isEmpty()) { - // For sure we can get the defaultCF from the table - if (guidePostMap.get(defaultCF) != null) { - gps = guidePostMap.get(defaultCF).getGuidePosts(); - } - } else { - Scan scan = context.getScan(); - if (scan.getFamilyMap().size() > 0 && !scan.getFamilyMap().containsKey(defaultCF)) { - // If default CF is not used in scan, use first CF referenced in scan - GuidePostsInfo guidePostsInfo = guidePostMap.get(scan.getFamilyMap().keySet().iterator().next()); - if (guidePostsInfo != null) { - gps = guidePostsInfo.getGuidePosts(); - } - } else { - // Otherwise, favor use of default CF. - if (guidePostMap.get(defaultCF) != null) { - gps = guidePostMap.get(defaultCF).getGuidePosts(); - } - } - } - if (gps == null) { - return Collections.emptyList(); - } - return gps; - } - - private static String toString(List<byte[]> gps) { - StringBuilder buf = new StringBuilder(gps.size() * 100); - buf.append("["); - for (int i = 0; i < gps.size(); i++) { - buf.append(Bytes.toStringBinary(gps.get(i))); - buf.append(","); - if (i > 0 && i < gps.size()-1 && (i % 10) == 0) { - buf.append("\n"); - } - } - buf.setCharAt(buf.length()-1, ']'); - return buf.toString(); - } - - private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary) { - PTable table = getTable(); - boolean startNewScanList = false; - if (!plan.isRowKeyOrdered()) { - startNewScanList = true; - } else if (crossedRegionBoundary) { - if (table.getIndexType() == IndexType.LOCAL) { - startNewScanList = true; - } else if (table.getBucketNum() != null) { - startNewScanList = scans.isEmpty() || - ScanUtil.crossesPrefixBoundary(startKey, - ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES), - SaltingUtil.NUM_SALTING_BYTES); - } - } - if (scan != null) { - scans.add(scan); - } - if (startNewScanList && !scans.isEmpty()) { - parallelScans.add(scans); - scans = Lists.newArrayListWithExpectedSize(1); - } - return scans; - } - - private List<List<Scan>> getParallelScans() throws SQLException { - return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); } - /** - * Compute the list of parallel scans to run for a given query. The inner scans - * may be concatenated together directly, while the other ones may need to be - * merge sorted, depending on the query. - * @return list of parallel scans to run for a given query. - * @throws SQLException - */ - private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException { - Scan scan = context.getScan(); - List<HRegionLocation> regionLocations = context.getConnection().getQueryServices() - .getAllTableRegions(physicalTableName); - - List<byte[]> regionBoundaries = toBoundaries(regionLocations); - ScanRanges scanRanges = context.getScanRanges(); - PTable table = getTable(); - boolean isSalted = table.getBucketNum() != null; - boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL; - List<byte[]> gps = getGuidePosts(); - if (logger.isDebugEnabled()) { - logger.debug("Guideposts: " + toString(gps)); - } - boolean traverseAllRegions = isSalted || isLocalIndex; - if (!traverseAllRegions) { - byte[] scanStartRow = scan.getStartRow(); - if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) { - startKey = scanStartRow; - } - byte[] scanStopRow = scan.getStopRow(); - if (stopKey.length == 0 || Bytes.compareTo(scanStopRow, stopKey) < 0) { - stopKey = scanStopRow; - } - } - - int regionIndex = 0; - int stopIndex = regionBoundaries.size(); - if (startKey.length > 0) { - regionIndex = getIndexContainingInclusive(regionBoundaries, startKey); - } - if (stopKey.length > 0) { - stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey)); - if (isLocalIndex) { - stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey(); - } - } - List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1); - - byte[] currentKey = startKey; - int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey); - int gpsSize = gps.size(); - int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1; - int keyOffset = 0; - List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion); - // Merge bisect with guideposts for all but the last region - while (regionIndex <= stopIndex) { - byte[] currentGuidePost, endKey, endRegionKey = EMPTY_BYTE_ARRAY; - if (regionIndex == stopIndex) { - endKey = stopKey; - } else { - endKey = regionBoundaries.get(regionIndex); - } - if (isLocalIndex) { - HRegionInfo regionInfo = regionLocations.get(regionIndex).getRegionInfo(); - endRegionKey = regionInfo.getEndKey(); - keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey); - } - while (guideIndex < gpsSize - && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) { - Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost, keyOffset, false); - scans = addNewScan(parallelScans, scans, newScan, currentGuidePost, false); - currentKey = currentGuidePost; - guideIndex++; - } - Scan newScan = scanRanges.intersectScan(scan, currentKey, endKey, keyOffset, true); - if (isLocalIndex) { - if (newScan != null) { - newScan.setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey); - } else if (!scans.isEmpty()) { - scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey); - } - } - scans = addNewScan(parallelScans, scans, newScan, endKey, true); - currentKey = endKey; - regionIndex++; - } - if (!scans.isEmpty()) { // Add any remaining scans - parallelScans.add(scans); - } - if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("The parallelScans: " + parallelScans, - ScanUtil.getCustomAnnotations(scan))); - } - return parallelScans; - } - - public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) { - if (!reverse) { - return list; - } - return Lists.reverse(list); - } - - private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) { - if (!concatIterators.isEmpty()) { - iterators.add(ConcatResultIterator.newConcatResultIterator(concatIterators)); - } - } - /** - * Executes the scan in parallel across all regions, blocking until all scans are complete. - * @return the result iterators for the scan of each region - */ @Override - public List<PeekingResultIterator> getIterators() throws SQLException { - boolean success = false; - boolean isReverse = ScanUtil.isReversed(context.getScan()); - boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL; - final ConnectionQueryServices services = context.getConnection().getQueryServices(); - ReadOnlyProps props = services.getProps(); - int numSplits = size(); - List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits); - List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits); - // TODO: what purpose does this scanID serve? - final UUID scanId = UUID.randomUUID(); - try { - submitWork(scanId, scans, futures, splits.size()); - int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS); - boolean clearedCache = false; - for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) { - List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size()); - for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) { - try { - PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); - concatIterators.add(iterator); - } catch (ExecutionException e) { - try { // Rethrow as SQLException - throw ServerUtil.parseServerException(e); - } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date - List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2); - if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries - services.clearTableRegionCache(physicalTableName); - clearedCache = true; - } - // Resubmit just this portion of work again - Scan oldScan = scanPair.getFirst(); - byte[] startKey = oldScan.getStartRow(); - byte[] endKey = oldScan.getStopRow(); - if (isLocalIndex) { - endKey = oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY); - } - List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey); - // Add any concatIterators that were successful so far - // as we need these to be in order - addConcatResultIterator(iterators, concatIterators); - concatIterators = Collections.emptyList(); - submitWork(scanId, newNestedScans, newFutures, newNestedScans.size()); - for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) { - for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) { - // Immediate do a get (not catching exception again) and then add the iterators we - // get back immediately. They'll be sorted as expected, since they're replacing the - // original one. - PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); - iterators.add(iterator); - } - } - } - } - } - addConcatResultIterator(iterators, concatIterators); - } - - success = true; - return iterators; - } catch (SQLException e) { - throw e; - } catch (Exception e) { - throw ServerUtil.parseServerException(e); - } finally { - if (!success) { - SQLCloseables.closeAllQuietly(iterators); - // Don't call cancel, as it causes the HConnection to get into a funk -// for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) { -// future.getSecond().cancel(true); -// } - } - } - } - - private static final class ScanLocation { - private final int outerListIndex; - private final int innerListIndex; - private final Scan scan; - public ScanLocation(Scan scan, int outerListIndex, int innerListIndex) { - this.outerListIndex = outerListIndex; - this.innerListIndex = innerListIndex; - this.scan = scan; - } - public int getOuterListIndex() { - return outerListIndex; - } - public int getInnerListIndex() { - return innerListIndex; - } - public Scan getScan() { - return scan; - } - } - private void submitWork(final UUID scanId, List<List<Scan>> nestedScans, + protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) { - final ConnectionQueryServices services = context.getConnection().getQueryServices(); - ExecutorService executor = services.getExecutor(); // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor // will spray the scans across machines as opposed to targeting a // single one since the scans are in row key order. - List<ScanLocation> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize); + ExecutorService executor = context.getConnection().getQueryServices().getExecutor(); + List<ScanLocator> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize); for (int i = 0; i < nestedScans.size(); i++) { List<Scan> scans = nestedScans.get(i); List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(scans.size()); nestedFutures.add(futures); for (int j = 0; j < scans.size(); j++) { Scan scan = nestedScans.get(i).get(j); - scanLocations.add(new ScanLocation(scan, i, j)); + scanLocations.add(new ScanLocator(scan, i, j)); futures.add(null); // placeholder } } + // Shuffle so that we start execution across many machines + // before we fill up the thread pool Collections.shuffle(scanLocations); - for (ScanLocation scanLocation : scanLocations) { + for (ScanLocator scanLocation : scanLocations) { final Scan scan = scanLocation.getScan(); - Future<PeekingResultIterator> future = - executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { + Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { @Override public PeekingResultIterator call() throws Exception { @@ -649,22 +110,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { } @Override - public int size() { - return this.scans.size(); - } - - @Override - public void explain(List<String> planSteps) { - boolean displayChunkCount = context.getConnection().getQueryServices().getProps().getBoolean( - QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, - QueryServicesOptions.DEFAULT_EXPLAIN_CHUNK_COUNT); - StringBuilder buf = new StringBuilder(); - buf.append("CLIENT " + (displayChunkCount ? (this.splits.size() + "-CHUNK ") : "") + "PARALLEL " + size() + "-WAY "); - explain(buf.toString(),planSteps); + protected String getName() { + return NAME; } - - @Override - public String toString() { - return "ParallelIterators [scans=" + scans + "]"; - } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java index 3051608..ef2b534 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java @@ -20,8 +20,13 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; import java.util.List; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.query.KeyRange; + public interface ResultIterators { - public List<PeekingResultIterator> getIterators() throws SQLException; public int size(); + public List<KeyRange> getSplits(); + public List<List<Scan>> getScans(); public void explain(List<String> planSteps); + public List<PeekingResultIterator> getIterators() throws SQLException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java new file mode 100644 index 0000000..5cb64a0 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.job.JobManager.JobCallable; +import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.util.LogUtil; +import org.apache.phoenix.util.ScanUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + + +/** + * + * Class that parallelizes the scan over a table using the ExecutorService provided. Each region of the table will be scanned in parallel with + * the results accessible through {@link #getIterators()} + * + * + * @since 0.1 + */ +public class SerialIterators extends BaseResultIterators { + private static final Logger logger = LoggerFactory.getLogger(SerialIterators.class); + private static final String NAME = "SERIAL"; + private final ParallelIteratorFactory iteratorFactory; + private final int limit; + + public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory) + throws SQLException { + super(plan, perScanLimit); + Preconditions.checkArgument(perScanLimit != null); // must be a limit specified + this.iteratorFactory = iteratorFactory; + this.limit = perScanLimit; + } + + @Override + protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans, + List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) { + // Pre-populate nestedFutures lists so that we can shuffle the scans + // and add the future to the right nested list. By shuffling the scans + // we get better utilization of the cluster since our thread executor + // will spray the scans across machines as opposed to targeting a + // single one since the scans are in row key order. + ExecutorService executor = context.getConnection().getQueryServices().getExecutor(); + + for (final List<Scan> scans : nestedScans) { + Scan firstScan = scans.get(0); + Scan lastScan = scans.get(scans.size()-1); + final Scan overallScan = ScanUtil.newScan(firstScan); + overallScan.setStopRow(lastScan.getStopRow()); + Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { + + @Override + public PeekingResultIterator call() throws Exception { + List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(scans.size()); + for (final Scan scan : scans) { + long startTime = System.currentTimeMillis(); + ResultIterator scanner = new TableResultIterator(context, tableRef, scan); + if (logger.isDebugEnabled()) { + logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan))); + } + concatIterators.add(iteratorFactory.newIterator(context, scanner, scan)); + } + PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators); + return new LimitingPeekingResultIterator(concatIterator, limit); + } + + /** + * Defines the grouping for round robin behavior. All threads spawned to process + * this scan will be grouped together and time sliced with other simultaneously + * executing parallel scans. + */ + @Override + public Object getJobId() { + return SerialIterators.this; + } + }, "Parallel scanner for table: " + tableRef.getTable().getName().getString())); + // Add our singleton Future which will execute serially + nestedFutures.add(Collections.singletonList(new Pair<Scan,Future<PeekingResultIterator>>(overallScan,future))); + } + } + + @Override + protected String getName() { + return NAME; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java index 2a5080e..a343b48 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory; import org.apache.phoenix.memory.MemoryManager; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryServices; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index 6a68df3..ed1ef70 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -30,7 +30,7 @@ import org.apache.phoenix.compile.IndexStatementRewriter; import org.apache.phoenix.compile.QueryCompiler; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.SequenceManager; -import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory; +import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.parse.HintNode.Hint; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 4894b18..6c03780 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -149,6 +149,7 @@ public class QueryServicesOptions { public static final double DEFAULT_TRACING_PROBABILITY_THRESHOLD = 0.05; public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 15 * 60000; // 15min + public static final int DEFAULT_STATS_GUIDEPOST_PER_REGION = 0; // Uses guidepost width by default public static final long DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES = 100 * 1024 *1024; // 100MB public static final boolean DEFAULT_STATS_USE_CURRENT_TIME = true; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java index 90c8324..9c85e63 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java @@ -25,11 +25,8 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -40,7 +37,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.ByteUtil; @@ -74,21 +70,14 @@ public class StatisticsCollector { public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException { Configuration config = env.getConfiguration(); - HTableInterface statsHTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES)); - int guidepostPerRegion = config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, 0); - if (guidepostPerRegion > 0) { - long maxFileSize = statsHTable.getTableDescriptor().getMaxFileSize(); - if (maxFileSize <= 0) { // HBase brain dead API doesn't give you the "real" max file size if it's not set... - maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE; - } - guidepostDepth = maxFileSize / guidepostPerRegion; - } else { - guidepostDepth = config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES); - } + int guidepostPerRegion = config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION); + long guidepostWidth = config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES); + this.guidepostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, env.getRegion().getTableDesc()); // Get the stats table associated with the current table on which the CP is // triggered - this.statsTable = StatisticsWriter.newWriter(statsHTable, tableName, clientTimeStamp); + this.statsTable = StatisticsWriter.newWriter(env, tableName, clientTimeStamp); } public long getMaxTimeStamp() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java index d8ffd84..2a7047f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java @@ -24,6 +24,8 @@ import java.util.TreeMap; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; @@ -110,4 +112,19 @@ public class StatisticsUtil { } return PTableStats.EMPTY_STATS; } + + public static long getGuidePostDepth(int guidepostPerRegion, long guidepostWidth, HTableDescriptor tableDesc) { + if (guidepostPerRegion > 0) { + long maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE; + if (tableDesc != null) { + long tableMaxFileSize = tableDesc.getMaxFileSize(); + if (tableMaxFileSize >= 0) { + maxFileSize = tableMaxFileSize; + } + } + return maxFileSize / guidepostPerRegion; + } else { + return guidepostWidth; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java index 6681042..f70c327 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java @@ -26,11 +26,13 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; @@ -43,6 +45,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TimeKeeper; import com.google.protobuf.ServiceException; @@ -63,24 +66,31 @@ public class StatisticsWriter implements Closeable { * @throws IOException * if the table cannot be created due to an underlying HTable creation error */ - public static StatisticsWriter newWriter(HTableInterface hTable, String tableName, long clientTimeStamp) throws IOException { + public static StatisticsWriter newWriter(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException { if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) { clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime(); } - StatisticsWriter statsTable = new StatisticsWriter(hTable, tableName, clientTimeStamp); + HTableInterface statsWriterTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES)); + HTableInterface statsReaderTable = ServerUtil.getHTableForCoprocessorScan(env, statsWriterTable); + StatisticsWriter statsTable = new StatisticsWriter(statsReaderTable, statsWriterTable, tableName, clientTimeStamp); if (clientTimeStamp != StatisticsCollector.NO_TIMESTAMP) { // Otherwise we do this later as we don't know the ts yet statsTable.commitLastStatsUpdatedTime(); } return statsTable; } - private final HTableInterface statisticsTable; + private final HTableInterface statsWriterTable; + // In HBase 0.98.4 or above, the reader and writer will be the same. + // In pre HBase 0.98.4, there was a bug in using the HTable returned + // from a coprocessor for scans, so in that case it'll be different. + private final HTableInterface statsReaderTable; private final byte[] tableName; private final long clientTimeStamp; - private StatisticsWriter(HTableInterface statsTable, String tableName, long clientTimeStamp) { - this.statisticsTable = statsTable; - this.tableName = PDataType.VARCHAR.toBytes(tableName); + private StatisticsWriter(HTableInterface statsReaderTable, HTableInterface statsWriterTable, String tableName, long clientTimeStamp) { + this.statsReaderTable = statsReaderTable; + this.statsWriterTable = statsWriterTable; + this.tableName = Bytes.toBytes(tableName); this.clientTimeStamp = clientTimeStamp; } @@ -89,7 +99,7 @@ public class StatisticsWriter implements Closeable { */ @Override public void close() throws IOException { - statisticsTable.close(); + statsWriterTable.close(); } public void splitStats(HRegion p, HRegion l, HRegion r, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException { @@ -100,7 +110,7 @@ public class StatisticsWriter implements Closeable { } long readTimeStamp = useMaxTimeStamp ? HConstants.LATEST_TIMESTAMP : clientTimeStamp; byte[] famBytes = PDataType.VARCHAR.toBytes(fam); - Result result = StatisticsUtil.readRegionStatistics(statisticsTable, tableName, famBytes, p.getRegionName(), readTimeStamp); + Result result = StatisticsUtil.readRegionStatistics(statsReaderTable, tableName, famBytes, p.getRegionName(), readTimeStamp); if (result != null && !result.isEmpty()) { Cell cell = result.getColumnLatestCell(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES); @@ -211,7 +221,7 @@ public class StatisticsWriter implements Closeable { mrmBuilder.addMutationRequest(ProtobufUtil.toMutation(getMutationType(m), m)); } MutateRowsRequest mrm = mrmBuilder.build(); - CoprocessorRpcChannel channel = statisticsTable.coprocessorService(row); + CoprocessorRpcChannel channel = statsWriterTable.coprocessorService(row); MultiRowMutationService.BlockingInterface service = MultiRowMutationService.newBlockingStub(channel); try { @@ -235,7 +245,7 @@ public class StatisticsWriter implements Closeable { // Always use wallclock time for this, as it's a mechanism to prevent // stats from being collected too often. Put put = getLastStatsUpdatedTimePut(clientTimeStamp); - statisticsTable.put(put); + statsWriterTable.put(put); } public void deleteStats(byte[] regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index f1e625b..1ca246f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -33,6 +33,7 @@ import java.util.Properties; import javax.annotation.Nullable; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.exception.SQLExceptionCode; @@ -69,6 +70,7 @@ import com.google.common.base.Preconditions; */ public class SchemaUtil { private static final int VAR_LENGTH_ESTIMATE = 10; + private static final int VAR_KV_LENGTH_ESTIMATE = 50; public static final String ESCAPE_CHARACTER = "\""; public static final DataBlockEncoding DEFAULT_DATA_BLOCK_ENCODING = DataBlockEncoding.FAST_DIFF; public static final PDatum VAR_BINARY_DATUM = new PDatum() { @@ -110,6 +112,28 @@ public class SchemaUtil { public static boolean isPKColumn(PColumn column) { return column.getFamilyName() == null; } + + /** + * Imperfect estimate of row size given a PTable + * TODO: keep row count in stats table and use total size / row count instead + * @param table + * @return estimate of size in bytes of a row + */ + public static long estimateRowSize(PTable table) { + int keyLength = estimateKeyLength(table); + long rowSize = 0; + for (PColumn column : table.getColumns()) { + if (!SchemaUtil.isPKColumn(column)) { + PDataType type = column.getDataType(); + Integer maxLength = column.getMaxLength(); + int valueLength = !type.isFixedWidth() ? VAR_KV_LENGTH_ESTIMATE : maxLength == null ? type.getByteSize() : maxLength; + rowSize += KeyValue.getKeyValueDataStructureSize(keyLength, column.getFamilyName().getBytes().length, column.getName().getBytes().length, valueLength); + } + } + // Empty key value + rowSize += KeyValue.getKeyValueDataStructureSize(keyLength, getEmptyColumnFamily(table).length, QueryConstants.EMPTY_COLUMN_BYTES.length, 0); + return rowSize; + } /** * Estimate the max key length in bytes of the PK for a given table http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java index 3327dba..7205faa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java @@ -35,6 +35,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.util.VersionUtil; +@SuppressWarnings("deprecation") public class ServerUtil { private static final int COPROCESSOR_SCAN_WORKS = VersionUtil.encodeVersion("0.98.6"); @@ -133,19 +134,34 @@ public class ServerUtil { return null; } - @SuppressWarnings("deprecation") - public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, String tableName) throws IOException { - String versionString = env.getHBaseVersion(); - int version = VersionUtil.encodeVersion(versionString); - if (version >= COPROCESSOR_SCAN_WORKS) { - // The following *should* work, but doesn't due to HBASE-11837 which was fixed in 0.98.6 - return env.getTable(TableName.valueOf(tableName)); - } - // This code works around HBASE-11837 + private static boolean coprocessorScanWorks(RegionCoprocessorEnvironment env) { + return (VersionUtil.encodeVersion(env.getHBaseVersion()) >= COPROCESSOR_SCAN_WORKS); + } + + /* + * This code works around HBASE-11837 which causes HTableInterfaces retrieved from + * RegionCoprocessorEnvironment to not read local data. + */ + private static HTableInterface getTableFromSingletonPool(RegionCoprocessorEnvironment env, byte[] tableName) { // It's ok to not ever do a pool.close() as we're storing a single // table only. The HTablePool holds no other resources that this table // which will be closed itself when it's no longer needed. + @SuppressWarnings("resource") HTablePool pool = new HTablePool(env.getConfiguration(),1); return pool.getTable(tableName); } + + public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, HTableInterface writerTable) throws IOException { + if (coprocessorScanWorks(env)) { + return writerTable; + } + return getTableFromSingletonPool(env, writerTable.getTableName()); + } + + public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, byte[] tableName) throws IOException { + if (coprocessorScanWorks(env)) { + return env.getTable(TableName.valueOf(tableName)); + } + return getTableFromSingletonPool(env, tableName); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java index 136a997..9dfcce5 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java @@ -44,6 +44,7 @@ import org.apache.phoenix.expression.function.SumAggregateFunction; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.BaseConnectionlessQueryTest; +import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.schema.PLongColumn; import org.apache.phoenix.schema.PName; @@ -140,6 +141,16 @@ public class AggregateResultScannerTest extends BaseConnectionlessQueryTest { @Override public void explain(List<String> planSteps) { } + + @Override + public List<KeyRange> getSplits() { + return Collections.emptyList(); + } + + @Override + public List<List<Scan>> getScans() { + return Collections.emptyList(); + } }; ResultIterator scanner = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregationManager.getAggregators()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java index a135729..02fdcea 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java @@ -24,9 +24,10 @@ import java.sql.SQLException; import java.util.*; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; - +import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.AssertResults; @@ -68,6 +69,15 @@ public class ConcatResultIteratorTest { public void explain(List<String> planSteps) { } + @Override + public List<KeyRange> getSplits() { + return Collections.emptyList(); + } + + @Override + public List<List<Scan>> getScans() { + return Collections.emptyList(); + } }; Tuple[] expectedResults = new Tuple[] { @@ -118,6 +128,15 @@ public class ConcatResultIteratorTest { public void explain(List<String> planSteps) { } + @Override + public List<KeyRange> getSplits() { + return Collections.emptyList(); + } + + @Override + public List<List<Scan>> getScans() { + return Collections.emptyList(); + } }; ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators); AssertResults.assertResults(scanner, expectedResults); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java index 9ff088e..095027c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java @@ -24,9 +24,10 @@ import java.sql.SQLException; import java.util.*; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; - +import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.AssertResults; @@ -73,6 +74,15 @@ public class MergeSortResultIteratorTest { public void explain(List<String> planSteps) { } + @Override + public List<KeyRange> getSplits() { + return Collections.emptyList(); + } + + @Override + public List<List<Scan>> getScans() { + return Collections.emptyList(); + } }; ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators); AssertResults.assertResults(scanner, expectedResults); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java index 1e3df0b..6139aa5 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java @@ -25,6 +25,7 @@ import java.sql.ResultSet; import java.sql.Statement; import java.util.Properties; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.junit.Test; @@ -183,5 +184,55 @@ public class QueryPlanTest extends BaseConnectionlessQueryTest { } } } + + @Test + public void testTenantSpecificConnWithLimit() throws Exception { + String baseTableDDL = "CREATE TABLE BASE_MULTI_TENANT_TABLE(\n " + + " tenant_id VARCHAR(5) NOT NULL,\n" + + " userid INTEGER NOT NULL,\n" + + " username VARCHAR NOT NULL,\n" + + " col VARCHAR\n " + + " CONSTRAINT pk PRIMARY KEY (tenant_id, userid, username)) MULTI_TENANT=true"; + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute(baseTableDDL); + conn.close(); + + String tenantId = "tenantId"; + String tenantViewDDL = "CREATE VIEW TENANT_VIEW AS SELECT * FROM BASE_MULTI_TENANT_TABLE"; + Properties tenantProps = new Properties(); + tenantProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + conn = DriverManager.getConnection(getUrl(), tenantProps); + conn.createStatement().execute(tenantViewDDL); + + String query = "EXPLAIN SELECT * FROM TENANT_VIEW LIMIT 1"; + ResultSet rs = conn.createStatement().executeQuery(query); + assertEquals("CLIENT SERIAL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" + + " SERVER FILTER BY PageFilter 1\n" + + " SERVER 1 ROW LIMIT\n" + + "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs)); + query = "EXPLAIN SELECT * FROM TENANT_VIEW LIMIT " + Integer.MAX_VALUE; + rs = conn.createStatement().executeQuery(query); + assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" + + " SERVER FILTER BY PageFilter " + Integer.MAX_VALUE + "\n" + + " SERVER " + Integer.MAX_VALUE + " ROW LIMIT\n" + + "CLIENT " + Integer.MAX_VALUE + " ROW LIMIT", QueryUtil.getExplainPlan(rs)); + query = "EXPLAIN SELECT * FROM TENANT_VIEW WHERE username = 'Joe' LIMIT 1"; + rs = conn.createStatement().executeQuery(query); + assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" + + " SERVER FILTER BY USERNAME = 'Joe'\n" + + " SERVER 1 ROW LIMIT\n" + + "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs)); + query = "EXPLAIN SELECT * FROM TENANT_VIEW WHERE col = 'Joe' LIMIT 1"; + rs = conn.createStatement().executeQuery(query); + assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" + + " SERVER FILTER BY COL = 'Joe'\n" + + " SERVER 1 ROW LIMIT\n" + + "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs)); + } + + @Test + public void testLimitOnTenantSpecific() throws Exception { + + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java index 2bff620..f6808a8 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java @@ -105,7 +105,7 @@ public final class PhoenixRecordReader extends RecordReader<NullWritable,Phoenix PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator); iterators.add(peekingResultIterator); } - ResultIterator iterator = ConcatResultIterator.newConcatResultIterator(iterators); + ResultIterator iterator = ConcatResultIterator.newIterator(iterators); if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) { iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager()); }
