Repository: phoenix
Updated Branches:
  refs/heads/3.0 5ca432b2d -> 6d1476225


PHOENIX-1188 Performance regression for non-aggregate queries


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6d147622
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6d147622
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6d147622

Branch: refs/heads/3.0
Commit: 6d1476225fedf58c46c2263344462aa98fc2d9ff
Parents: 5ca432b
Author: James Taylor <jtay...@salesforce.com>
Authored: Thu Aug 21 00:04:12 2014 -0700
Committer: James Taylor <jtay...@salesforce.com>
Committed: Thu Aug 21 00:04:12 2014 -0700

----------------------------------------------------------------------
 .../phoenix/iterate/ChunkedResultIterator.java  | 83 +++++++-------------
 .../phoenix/query/QueryServicesOptions.java     |  9 ++-
 2 files changed, 35 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d147622/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index d7fbe79..c702e99 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -29,10 +29,13 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * {@code PeekingResultIterator} implementation that loads data in chunks. 
This is intended for
  * basic scan plans, to avoid loading large quantities of data from HBase in 
one go.
@@ -41,7 +44,7 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
     private static final Logger logger = 
LoggerFactory.getLogger(ChunkedResultIterator.class);
 
     private final ParallelIterators.ParallelIteratorFactory 
delegateIteratorFactory;
-    private SingleChunkResultIterator singleChunkResultIterator;
+    private ImmutableBytesWritable lastKey = new ImmutableBytesWritable();
     private final StatementContext context;
     private final TableRef tableRef;
     private Scan scan;
@@ -71,12 +74,19 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
     }
 
     public ChunkedResultIterator(ParallelIterators.ParallelIteratorFactory 
delegateIteratorFactory,
-            StatementContext context, TableRef tableRef, Scan scan, long 
chunkSize) {
+            StatementContext context, TableRef tableRef, Scan scan, long 
chunkSize) throws SQLException {
         this.delegateIteratorFactory = delegateIteratorFactory;
         this.context = context;
         this.tableRef = tableRef;
         this.scan = scan;
         this.chunkSize = chunkSize;
+        // Instantiate single chunk iterator and the delegate iterator in 
constructor
+        // to get parallel scans kicked off in separate threads. If we delay 
this,
+        // we'll get serialized behavior (see PHOENIX-
+        if (logger.isDebugEnabled()) logger.debug("Get first chunked result 
iterator over " + tableRef.getTable().getName().getString() + " with " + scan);
+        ResultIterator singleChunkResultIterator = new 
SingleChunkResultIterator(
+                new TableResultIterator(context, tableRef, scan), chunkSize);
+        resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan);
     }
 
     @Override
@@ -96,26 +106,16 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
 
     @Override
     public void close() throws SQLException {
-        if (resultIterator != null) {
-            resultIterator.close();
-        }
-        if (singleChunkResultIterator != null) {
-            singleChunkResultIterator.close();
-        }
+        resultIterator.close();
     }
 
     private PeekingResultIterator getResultIterator() throws SQLException {
-        if (resultIterator == null) {
-            if (logger.isDebugEnabled()) logger.debug("Get first chunked 
result iterator over " + tableRef.getTable().getName().getString() + " with " + 
scan);
-            singleChunkResultIterator = new SingleChunkResultIterator(
-                    new TableResultIterator(context, tableRef, scan), 
chunkSize);
-            resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan);
-        } else if (resultIterator.peek() == null && 
!singleChunkResultIterator.isEndOfStreamReached()) {
-            singleChunkResultIterator.close();
+        if (resultIterator.peek() == null && lastKey != null) {
+            resultIterator.close();
             scan = ScanUtil.newScan(scan);
-            scan.setStartRow(Bytes.add(singleChunkResultIterator.getLastKey(), 
new byte[]{0}));
+            scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
             if (logger.isDebugEnabled()) logger.debug("Get next chunked result 
iterator over " + tableRef.getTable().getName().getString() + " with " + scan);
-            singleChunkResultIterator = new SingleChunkResultIterator(
+            ResultIterator singleChunkResultIterator = new 
SingleChunkResultIterator(
                     new TableResultIterator(context, tableRef, scan), 
chunkSize);
             resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan);
         }
@@ -125,23 +125,22 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
     /**
      * ResultIterator that runs over a single chunk of results (i.e. a portion 
of a scan).
      */
-    private static class SingleChunkResultIterator implements ResultIterator {
+    private class SingleChunkResultIterator implements ResultIterator {
 
         private int rowCount = 0;
         private boolean chunkComplete;
-        private boolean endOfStreamReached;
-        private Tuple lastTuple;
         private final ResultIterator delegate;
         private final long chunkSize;
 
         private SingleChunkResultIterator(ResultIterator delegate, long 
chunkSize) {
+            Preconditions.checkArgument(chunkSize > 0);
             this.delegate = delegate;
             this.chunkSize = chunkSize;
         }
 
         @Override
         public Tuple next() throws SQLException {
-            if (isChunkComplete() || isEndOfStreamReached()) {
+            if (chunkComplete || lastKey == null) {
                 return null;
             }
             Tuple next = delegate.next();
@@ -150,14 +149,15 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
                 // necessary for (at least) hash joins, as they can return 
multiple rows with the
                 // same row key. Stopping a chunk at a row key boundary is 
necessary in order to
                 // be able to start the next chunk on the next row key
-                if (rowCount >= chunkSize && rowKeyChanged(lastTuple, next)) {
+                if (rowCount == chunkSize) {
+                    next.getKey(lastKey);
+                } else if (rowCount > chunkSize && rowKeyChanged(next)) {
                     chunkComplete = true;
                     return null;
                 }
-                lastTuple = next;
                 rowCount++;
             } else {
-                endOfStreamReached = true;
+                lastKey = null;
             }
             return next;
         }
@@ -172,36 +172,13 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
             delegate.close();
         }
 
-        /**
-         * Returns true if the current chunk has been fully iterated over.
-         */
-        public boolean isChunkComplete() {
-            return chunkComplete;
-        }
-
-        /**
-         * Returns true if the end of all chunks has been reached.
-         */
-        public boolean isEndOfStreamReached() {
-            return endOfStreamReached;
-        }
-
-        /**
-         * Returns the last-encountered key.
-         */
-        public byte[] getLastKey() {
-            ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
-            lastTuple.getKey(keyPtr);
-            return keyPtr.get();
-        }
-
-        private boolean rowKeyChanged(Tuple lastTuple, Tuple newTuple) {
-            ImmutableBytesWritable oldKeyPtr = new ImmutableBytesWritable();
-            ImmutableBytesWritable newKeyPtr = new ImmutableBytesWritable();
-            lastTuple.getKey(oldKeyPtr);
-            newTuple.getKey(newKeyPtr);
+        private boolean rowKeyChanged(Tuple newTuple) {
+            byte[] currentKey = lastKey.get();
+            int offset = lastKey.getOffset();
+            int length = lastKey.getLength();
+            newTuple.getKey(lastKey);
 
-            return oldKeyPtr.compareTo(newKeyPtr) != 0;
+            return Bytes.compareTo(currentKey, offset, length, lastKey.get(), 
lastKey.getOffset(), lastKey.getLength()) != 0;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d147622/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 99bd7ef..04f31e6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -100,10 +100,11 @@ public class QueryServicesOptions {
     public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 
1024 * 1; // 1 Mb
     public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 5;
     public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
-    // We make the default chunk size one row smaller than the default scan 
cache size because
-    // one extra row is typically read and discarded by the 
ChunkedResultIterator, and we don't
-    // want to fill up a whole new cache to read a single extra record
-    public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 
DEFAULT_SCAN_CACHE_SIZE - 1L;
+    // Only the first chunked batches are fetched in parallel, so this default
+    // should be on the relatively bigger side of things. Bigger means more
+    // latency and client-side spooling/buffering. Smaller means less initial
+    // latency and less parallelization.
+    public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 2999;
     
     // 
     // Spillable GroupBy - SPGBY prefix

Reply via email to