Repository: phoenix
Updated Branches:
  refs/heads/3.0 343d9262c -> f714932f0


PHOENIX-1103 ChunkedResultIterator compat with hash joins

Remove the special case for hash joins in the
ChunkedResultIterator. Once the chunk size is reached, continue
scanning until the row key changes. This commit also adds a
specific test in the HashJoinIT to test with a small chunk size.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f714932f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f714932f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f714932f

Branch: refs/heads/3.0
Commit: f714932f0c72989e2382aace29dbfb304fb24db5
Parents: 343d926
Author: Gabriel Reid <gr...@apache.org>
Authored: Mon Jul 21 18:06:05 2014 +0200
Committer: Gabriel Reid <gabri...@ngdata.com>
Committed: Wed Jul 23 15:45:17 2014 +0200

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/HashJoinIT.java  | 78 +++++++++++++++++++-
 .../phoenix/iterate/ChunkedResultIterator.java  | 39 ++++++----
 .../org/apache/phoenix/join/HashJoinInfo.java   | 11 ---
 .../phoenix/query/QueryServicesOptions.java     |  5 +-
 4 files changed, 105 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f714932f/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 26c9455..b94ba99 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -104,7 +104,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
         }
     }
     
-    @Parameters(name="{0}")
+    @Parameters
     public static Collection<Object> data() {
         List<Object> testCases = Lists.newArrayList();
         testCases.add(new String[][] {
@@ -1783,7 +1783,83 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
         String query = "SELECT \"order_id\", i.name, s.name, quantity, date 
FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN " 
             + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" 
RIGHT JOIN "
             + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = 
s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+
+        Properties props = new Properties(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "S5");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertNull(rs.getString(2));
+            assertEquals(rs.getString(3), "S4");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertNull(rs.getString(2));
+            assertEquals(rs.getString(3), "S3");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertEquals(rs.getString(2), "T4");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 1000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 2000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 3000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000004");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 4000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 5000);
+            assertNotNull(rs.getDate(5));
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+
+    // Basically a copy of testMultiRightJoin, but with a very small result 
scan chunk size
+    // to test that repeated row keys within a single chunk are handled 
properly
+    @Test
+    public void testMultiRightJoin_SmallChunkSize() throws Exception {
+        String query = "SELECT \"order_id\", i.name, s.name, quantity, date 
FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN "
+                + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = 
i.\"item_id\" RIGHT JOIN "
+                + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = 
s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+
         Properties props = new Properties(TEST_PROPERTIES);
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, "1");
         Connection conn = DriverManager.getConnection(getUrl(), props);
         try {
             PreparedStatement statement = conn.prepareStatement(query);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f714932f/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index 92080eb..cfaca84 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.exception.PhoenixIOException;
-import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
@@ -60,19 +59,11 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
 
         @Override
         public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner) throws SQLException {
-            // TODO It doesn't seem right to do this selection here, but it's 
not currently clear
-            // where a better place is to do it
-            // For a HashJoin the scan can't be restarted where it left off, 
so we don't use
-            // a ChunkedResultIterator
-            if (HashJoinInfo.isHashJoin(context.getScan())) {
-                return delegateFactory.newIterator(context, scanner);
-            } else {
-               scanner.close(); //close the iterator since we don't need it 
anymore.
-                return new ChunkedResultIterator(delegateFactory, context, 
tableRef,
-                        
context.getConnection().getQueryServices().getProps().getLong(
-                                            
QueryServices.SCAN_RESULT_CHUNK_SIZE,
-                                            
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE));
-            }
+            scanner.close(); //close the iterator since we don't need it 
anymore.
+            return new ChunkedResultIterator(delegateFactory, context, 
tableRef,
+                    
context.getConnection().getQueryServices().getProps().getLong(
+                                        QueryServices.SCAN_RESULT_CHUNK_SIZE,
+                                        
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE));
         }
     }
 
@@ -136,6 +127,7 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
     private static class SingleChunkResultIterator implements ResultIterator {
 
         private int rowCount = 0;
+        private boolean chunkComplete;
         private boolean endOfStreamReached;
         private Tuple lastTuple;
         private final ResultIterator delegate;
@@ -153,6 +145,14 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
             }
             Tuple next = delegate.next();
             if (next != null) {
+                // We actually keep going past the chunk size until the row 
key changes. This is
+                // necessary for (at least) hash joins, as they can return 
multiple rows with the
+                // same row key. Stopping a chunk at a row key boundary is 
necessary in order to
+                // be able to start the next chunk on the next row key
+                if (rowCount >= chunkSize && rowKeyChanged(lastTuple, next)) {
+                    chunkComplete = true;
+                    return null;
+                }
                 lastTuple = next;
                 rowCount++;
             } else {
@@ -175,7 +175,7 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
          * Returns true if the current chunk has been fully iterated over.
          */
         public boolean isChunkComplete() {
-            return rowCount == chunkSize;
+            return chunkComplete;
         }
 
         /**
@@ -193,5 +193,14 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
             lastTuple.getKey(keyPtr);
             return keyPtr.get();
         }
+
+        private boolean rowKeyChanged(Tuple lastTuple, Tuple newTuple) {
+            ImmutableBytesWritable oldKeyPtr = new ImmutableBytesWritable();
+            ImmutableBytesWritable newKeyPtr = new ImmutableBytesWritable();
+            lastTuple.getKey(oldKeyPtr);
+            newTuple.getKey(newKeyPtr);
+
+            return oldKeyPtr.compareTo(newKeyPtr) != 0;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f714932f/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java 
b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
index ce336b8..d00e802 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
@@ -224,15 +224,4 @@ public class HashJoinInfo {
             }
         }
     }
-
-    /**
-     * Check if a scan is intended for completing a HashJoin.
-     *
-     * @param scan the scan to be checked
-     * @return {@code true} if the scan is to be used for a HashJoin, 
otherwise {@code false}
-     */
-    public static boolean isHashJoin(Scan scan) {
-        return scan.getAttribute(HASH_JOIN) != null;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f714932f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 108dad4..b4db032 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -100,7 +100,10 @@ public class QueryServicesOptions {
     public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 
1024 * 1; // 1 Mb
     public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 5;
     public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
-    public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 1000L;
+    // We make the default chunk size one row smaller than the default scan 
cache size because
+    // one extra row is typically read and discarded by the 
ChunkedResultIterator, and we don't
+    // want to fill up a whole new cache to read a single extra record
+    public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 
DEFAULT_SCAN_CACHE_SIZE - 1L;
     
     // 
     // Spillable GroupBy - SPGBY prefix

Reply via email to