Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 bb8d7cd0d -> 6e75b6aff


PHOENIX-2949 Fix estimated region size when checking for serial query


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6e75b6af
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6e75b6af
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6e75b6af

Branch: refs/heads/4.x-HBase-0.98
Commit: 6e75b6aff59583517208031e2ed3117e705abdc5
Parents: bb8d7cd
Author: Ankit Singhal <ankitsingha...@gmail.com>
Authored: Thu Jun 23 13:57:52 2016 -0700
Committer: Ankit Singhal <ankitsingha...@gmail.com>
Committed: Thu Jun 23 13:57:52 2016 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/execute/ScanPlan.java    | 46 ++++++++++----------
 .../org/apache/phoenix/query/QueryServices.java |  2 +-
 2 files changed, 25 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6e75b6af/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index c55a1cc..0975b3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -25,7 +25,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -62,7 +62,6 @@ import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.PTableStats;
-import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
@@ -118,7 +117,7 @@ public class ScanPlan extends BaseQueryPlan {
         Scan scan = context.getScan();
         /*
          * If a limit is provided and we have no filter, run the scan serially 
when we estimate that
-         * the limit's worth of data will fit into a single region.
+         * the limit's worth of data is less than the threshold bytes provided 
in QueryServices.QUERY_PARALLEL_LIMIT_THRESHOLD
          */
         Integer perScanLimit = !allowPageFilter ? null : limit;
         if (perScanLimit == null || scan.getFilter() != null) {
@@ -127,32 +126,35 @@ public class ScanPlan extends BaseQueryPlan {
         long scn = context.getConnection().getSCN() == null ? Long.MAX_VALUE : 
context.getConnection().getSCN();
         PTableStats tableStats = 
context.getConnection().getQueryServices().getTableStats(table.getName().getBytes(),
 scn);
         GuidePostsInfo gpsInfo = 
tableStats.getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
-        long estRowSize = SchemaUtil.estimateRowSize(table);
-        long estRegionSize;
+        ConnectionQueryServices services = 
context.getConnection().getQueryServices();
+        long estRowSize;
+        long estimatedParallelThresholdBytes;
         if (gpsInfo == null) {
-            // Use guidepost depth as minimum size
-            ConnectionQueryServices services = 
context.getConnection().getQueryServices();
-            HTableDescriptor desc = 
services.getTableDescriptor(table.getPhysicalName().getBytes());
-            int guidepostPerRegion = 
services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
-                    QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
-            long guidepostWidth = 
services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
-                    QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
-            estRegionSize = 
StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc);
+            estRowSize = SchemaUtil.estimateRowSize(table);
+            estimatedParallelThresholdBytes = 
services.getProps().getLong(HConstants.HREGION_MAX_FILESIZE,
+                    HConstants.DEFAULT_MAX_FILE_SIZE);
         } else {
-            // Region size estimated based on total number of bytes divided by 
number of regions
             long totByteSize = 0;
+            long totRowCount = 0;
             for (long byteCount : gpsInfo.getByteCounts()) {
                 totByteSize += byteCount;
             }
-            estRegionSize = totByteSize / (gpsInfo.getGuidePostsCount()+1);
+            for (long rowCount : gpsInfo.getRowCounts()) {
+                totRowCount += rowCount;
+            }
+            estRowSize = totByteSize / totRowCount;
+            estimatedParallelThresholdBytes = 2
+                    * 
services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+                            
QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
         }
-        // TODO: configurable number of bytes?
-        boolean isSerial = (perScanLimit * estRowSize < estRegionSize);
-        
-        if (logger.isDebugEnabled()) 
logger.debug(LogUtil.addCustomAnnotations("With LIMIT=" + perScanLimit
-                + ", estimated row size=" + estRowSize
-                + ", estimated region size=" + estRegionSize + " (" + (gpsInfo 
== null ? "without " : "with ") + "stats)"
-                + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution", 
context.getConnection()));
+        long limitThreshold = 
services.getProps().getLong(QueryServices.QUERY_PARALLEL_LIMIT_THRESHOLD,
+                estimatedParallelThresholdBytes);
+        boolean isSerial = (perScanLimit * estRowSize < limitThreshold);
+
+        if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations(
+                "With LIMIT=" + perScanLimit + ", estimated row size=" + 
estRowSize + ", limitThreshold="
+                        + limitThreshold + ": " + (isSerial ? "SERIAL" : 
"PARALLEL") + " execution",
+                context.getConnection()));
         return isSerial;
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6e75b6af/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 110dbf0..196686d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -213,7 +213,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String HCONNECTION_POOL_CORE_SIZE = 
"hbase.hconnection.threads.core";
     public static final String HCONNECTION_POOL_MAX_SIZE = 
"hbase.hconnection.threads.max";
     public static final String HTABLE_MAX_THREADS = "hbase.htable.threads.max";
-
+    public static final String QUERY_PARALLEL_LIMIT_THRESHOLD = 
"phoenix.query.parallelThresholdBytes";
     // time to wait before running second index population upsert select (so 
that any pending batches of rows on region server are also written to index)
     public static final String INDEX_POPULATION_SLEEP_TIME = 
"phoenix.index.population.wait.time";
 

Reply via email to