Repository: phoenix Updated Branches: refs/heads/4.0 eb26ca4ed -> 441b396fc
PHOENIX-1446 Add/fix logging for LIMIT optimization Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/441b396f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/441b396f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/441b396f Branch: refs/heads/4.0 Commit: 441b396fc9c6fa7d26d6be3ccedbae05ee533245 Parents: eb26ca4 Author: James Taylor <[email protected]> Authored: Wed Nov 12 16:40:03 2014 -0800 Committer: James Taylor <[email protected]> Committed: Wed Nov 12 16:40:03 2014 -0800 ---------------------------------------------------------------------- .../org/apache/phoenix/execute/ScanPlan.java | 7 +++++ .../phoenix/iterate/BaseResultIterators.java | 33 ++++++++++++-------- .../phoenix/iterate/ParallelIterators.java | 5 ++- .../apache/phoenix/iterate/SerialIterators.java | 6 ++-- .../apache/phoenix/jdbc/PhoenixStatement.java | 19 ++++++++++- .../java/org/apache/phoenix/util/QueryUtil.java | 19 +++++++++-- 6 files changed, 65 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/441b396f/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 578855d..00cc90c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -54,6 +54,8 @@ import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.StatisticsUtil; import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @@ -65,6 +67,7 @@ import org.apache.phoenix.util.SchemaUtil; * @since 0.1 */ public class ScanPlan extends BaseQueryPlan { + private static final Logger logger = LoggerFactory.getLogger(ScanPlan.class); private List<KeyRange> splits; private List<List<Scan>> scans; private boolean allowPageFilter; @@ -149,6 +152,10 @@ public class ScanPlan extends BaseQueryPlan { if (perScanLimit * estRowSize < estRegionSize) { isSerial = true; } + if (logger.isDebugEnabled()) logger.debug("With LIMIT=" + perScanLimit + + ", estimated row size=" + estRowSize + + ", estimated region size=" + estRegionSize + " (" + (gpsInfo == null ? "without " : "with ") + "stats)" + + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution"); } ResultIterators iterators; if (isSerial) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/441b396f/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index ade83db..7785c54 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -97,6 +97,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private final PTableStats tableStats; private final byte[] physicalTableName; private final QueryPlan plan; + protected final String scanId; // TODO: too much nesting here - breakup into new classes. private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures; @@ -139,6 +140,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result physicalTableName = table.getPhysicalName().getBytes(); tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS; Scan scan = context.getScan(); + // Used to tie all the scans together during logging + scanId = UUID.randomUUID().toString(); if (projector.isProjectEmptyKeyValue()) { Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap(); // If nothing projected into scan and we only have one column family, just allow everything @@ -475,10 +478,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result if (!scans.isEmpty()) { // Add any remaining scans parallelScans.add(scans); } - if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("The parallelScans: " + parallelScans, - ScanUtil.getCustomAnnotations(scan))); - } return parallelScans; } @@ -495,8 +494,13 @@ public abstract class BaseResultIterators extends ExplainTable implements Result */ @Override public List<PeekingResultIterator> getIterators() throws SQLException { + Scan scan = context.getScan(); + if (logger.isDebugEnabled()) { + logger.debug(LogUtil.addCustomAnnotations("Getting iterators for " + this, + ScanUtil.getCustomAnnotations(scan))); + } boolean success = false; - boolean isReverse = ScanUtil.isReversed(context.getScan()); + boolean isReverse = ScanUtil.isReversed(scan); boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL; final ConnectionQueryServices services = context.getConnection().getQueryServices(); ReadOnlyProps props = services.getProps(); @@ -505,10 +509,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits); allFutures.add(futures); SQLException toThrow = null; - // TODO: what purpose does this scanID serve? - final UUID scanId = UUID.randomUUID(); try { - submitWork(scanId, scans, futures, splits.size()); + submitWork(scans, futures, splits.size()); int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS); boolean clearedCache = false; for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) { @@ -538,7 +540,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // as we need these to be in order addIterator(iterators, concatIterators); concatIterators = Collections.emptyList(); - submitWork(scanId, newNestedScans, newFutures, newNestedScans.size()); + submitWork(newNestedScans, newFutures, newNestedScans.size()); allFutures.add(newFutures); for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) { for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) { @@ -602,7 +604,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) { for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) { for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) { - cancelledWork |= futurePair.getSecond().cancel(false); + if (futurePair != null) { // FIXME: null check should not be necessary + Future<PeekingResultIterator> future = futurePair.getSecond(); + if (future != null) { + cancelledWork |= future.cancel(false); + } + } } } } @@ -640,8 +647,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result abstract protected String getName(); - abstract protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans, - List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize); + abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, + int estFlattenedSize); @Override public int size() { @@ -660,6 +667,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result @Override public String toString() { - return "ResultIterators [name=" + getName() + ",scans=" + scans + "]"; + return "ResultIterators [name=" + getName() + ",id=" + scanId + ",scans=" + scans + "]"; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/441b396f/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index bde3f78..d16160c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -20,7 +20,6 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; import java.util.Collections; import java.util.List; -import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -57,8 +56,8 @@ public class ParallelIterators extends BaseResultIterators { } @Override - protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans, - List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) { + protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, + int estFlattenedSize) { // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor http://git-wip-us.apache.org/repos/asf/phoenix/blob/441b396f/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index 5cb64a0..502cdf8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -20,14 +20,12 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; import java.util.Collections; import java.util.List; -import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.QueryPlan; -import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.LogUtil; @@ -62,8 +60,8 @@ public class SerialIterators extends BaseResultIterators { } @Override - protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans, - List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) { + protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, + int estFlattenedSize) { // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor http://git-wip-us.apache.org/repos/asf/phoenix/blob/441b396f/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index c369be8..1eae037 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -117,10 +117,14 @@ import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixContextExecutor; +import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ServerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Throwables; import com.google.common.collect.ListMultimap; @@ -146,6 +150,8 @@ import com.google.common.collect.Lists; * @since 0.1 */ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.Statement { + private static final Logger logger = LoggerFactory.getLogger(PhoenixStatement.class); + public enum Operation { QUERY("queried", false), DELETE("deleted", true), @@ -217,7 +223,12 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho PhoenixStatement.this, plan); // this will create its own trace internally, so we don't wrap this // whole thing in tracing - PhoenixResultSet rs = newResultSet(plan.iterator(), plan.getProjector()); + ResultIterator resultIterator = plan.iterator(); + if (logger.isDebugEnabled()) { + String explainPlan = QueryUtil.getExplainPlan(resultIterator); + logger.debug(LogUtil.addCustomAnnotations("Explain plan: " + explainPlan, connection)); + } + PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector()); resultSets.add(rs); setLastQueryPlan(plan); setLastResultSet(rs); @@ -1000,12 +1011,18 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho } public MutationPlan compileMutation(String sql) throws SQLException { + if (logger.isDebugEnabled()) { + logger.debug(LogUtil.addCustomAnnotations("Execute update: " + sql, connection)); + } CompilableStatement stmt = parseStatement(sql); return compileMutation(stmt, sql); } @Override public ResultSet executeQuery(String sql) throws SQLException { + if (logger.isDebugEnabled()) { + logger.debug(LogUtil.addCustomAnnotations("Execute query: " + sql, connection)); + } CompilableStatement stmt = parseStatement(sql); if (stmt.getOperation().isMutation()) { throw new ExecuteQueryNotApplicableException(sql); http://git-wip-us.apache.org/repos/asf/phoenix/blob/441b396f/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java index 7f5d4c6..1bc702a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java @@ -34,18 +34,17 @@ import javax.annotation.Nullable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.query.QueryServices; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import org.apache.phoenix.query.QueryServices; public final class QueryUtil { @@ -187,6 +186,20 @@ public final class QueryUtil { return buf.toString(); } + public static String getExplainPlan(ResultIterator iterator) throws SQLException { + List<String> steps = Lists.newArrayList(); + iterator.explain(steps); + StringBuilder buf = new StringBuilder(); + for (String step : steps) { + buf.append(step); + buf.append('\n'); + } + if (buf.length() > 0) { + buf.setLength(buf.length()-1); + } + return buf.toString(); + } + public static Connection getConnection(Configuration conf) throws ClassNotFoundException, SQLException { return getConnection(new Properties(), conf);
