[ 
https://issues.apache.org/jira/browse/PHOENIX-1779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14494530#comment-14494530
 ] 

James Taylor commented on PHOENIX-1779:
---------------------------------------

Looking good, [~samarthjain]. Here's some feedback and questions:

Can you add a row count check to this test? I noticed your other test has that 
already:
{code}
- +    @Test
+    public void testUnionAllSelects() throws Exception {
+        Set<String> keySetA = createTableAndInsertRows("TABLEA", 10, true, 
true);
+        Set<String> keySetB = createTableAndInsertRows("TABLEB", 5, true, 
true);
+        Set<String> keySetC = createTableAndInsertRows("TABLEC", 7, false, 
true);
+        String query = "SELECT K FROM TABLEA UNION ALL SELECT K FROM TABLEB 
UNION ALL SELECT K FROM TABLEC";
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement stmt = conn.prepareStatement(query);
+        stmt.setFetchSize(2); // force parallel fetch of scanner cache
+        ResultSet rs = stmt.executeQuery();
+        while (rs.next()) {
+            String key = rs.getString(1);
+            keySetA.remove(key);
+            keySetB.remove(key);
+            keySetC.remove(key);
+        }
+        assertEquals("Not all rows of tableA were returned", 0, 
keySetA.size());
+        assertEquals("Not all rows of tableB were returned", 0, 
keySetB.size());
+        assertEquals("Not all rows of tableC were returned", 0, 
keySetC.size());
+    }
+    
{code}

In RoundRobinResultIterator, does the iterators.size() change as a result of a 
split having occurred (as otherwise it sounds like a race condition)? If that's 
correct, would you mind adding a comment to this effect?
{code}
+                    // resize and replace the iterators list.
+                    size = openIterators.size();
+                    if (size > 0) {
+                        iterators = getIterators();
+                        // Possible that the number of iterators changed after 
the above call.
+                        size = iterators.size();
{code}

I don't think you need the Map<PeekingResultIterator, Integer> in 
RoundRobinResultIterators. You just need two parallel arrays: a 
PeekingResultIterator[] for the open iterators and an int[] with the number of 
records read for each open iterator. The index member variable will index into 
them. When an iterator is exhausted, you just remove that iterator from the 
PeekingResultIterator[] and remove the record read count from the int[].

I don't think you need the PrefetchedRecordsIterator (you knew I'd find that 
instanceof check, didn't you? :-) ). Do your fetchNextBatch() when a next() is 
done when numScannersCacheExhausted == openIterators.length. Then you can do 
the submit like you're doing, but just return the next Tuple for each one. 
You'll have a parallel Tuple[] member variable in that case. You can adjust 
your PeekingResultIterator[] and rowsRead[] based on a Tuple being null. Then 
add a counter member variable based on how many Tuples you have left. You'll 
count down the member variable in next() calls, not calling the underlying 
next() until the counter is zero.
{code}
+        for (final PeekingResultIterator itr : iterators) {
+            Future<Tuple> future = executor
+                    .submit(new Callable<Tuple>() {
+                        @Override
+                        public Tuple call() throws Exception {
+                            // Read the next record to refill the scanner's 
cache.
+                            return itr.next();
+                        }
+                    });
+            futures.add(future);
{code}

Minor nit: can you rename QueryPlan.isRoundRobinPossible() to 
QueryPlan.useRoundRobinIterator()?

Add a series of tests in QueryCompilerTest (or a new unit test) that are 
similar to QueryCompilerTest.testGroupByOrderPreserving() and 
testNotGroupByOrderPreserving() that confirm given a query whether or not 
QueryPlan.useRoundRobinIterator() is true or false.





> Parallelize fetching of next batch of records for scans corresponding to 
> queries with no order by 
> --------------------------------------------------------------------------------------------------
>
>                 Key: PHOENIX-1779
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-1779
>             Project: Phoenix
>          Issue Type: Improvement
>            Reporter: Samarth Jain
>            Assignee: Samarth Jain
>         Attachments: PHOENIX-1779.patch, wip.patch, wip3.patch, 
> wipwithsplits.patch
>
>
> Today in Phoenix we parallelize the first execution of scans i.e. we load 
> only the first batch of records up to the scan's cache size in parallel. 
> Loading of subsequent batches of records in scanners is essentially serial. 
> This could be improved especially for queries, including the ones with no 
> order by clauses,  that do not need any kind of merge sort on the client. 
> This could also potentially improve the performance of UPSERT SELECT 
> statements that load data from one table and insert into another. One such 
> use case being creating immutable indexes for tables that already have data. 
> It could also potentially improve the performance of our MapReduce solution 
> for bulk loading data by improving the speed of the loading/mapping phase. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to