Limit width to 5x the number of endpoints
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b03740e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b03740e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b03740e8 Branch: refs/heads/master Commit: b03740e897e17638ad49ff72a55931d777cfe7dd Parents: 78a6197 Author: Steven Phillips <[email protected]> Authored: Thu Aug 29 19:34:13 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sun Sep 1 15:16:21 2013 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/planner/fragment/Wrapper.java | 1 + .../drill/exec/store/TestParquetPhysicalPlan.java | 12 +++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b03740e8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java index 8c4b0b4..7e2497e 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java @@ -152,6 +152,7 @@ public class Wrapper { endpoints.add(all.get(i % div)); } } else { + width = Math.min(width, values.size()*5); // get nodes with highest affinity. Collections.sort(values); values = Lists.reverse(values); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b03740e8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java index c58bc71..e1344f4 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java @@ -36,6 +36,7 @@ import java.nio.charset.Charset; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import static junit.framework.Assert.assertNull; import static org.junit.Assert.assertEquals; @@ -85,6 +86,7 @@ public class TestParquetPhysicalPlan { } private class ParquetResultsListener implements UserResultsListener { + AtomicInteger count = new AtomicInteger(); private CountDownLatch latch = new CountDownLatch(1); @Override public void submissionFailed(RpcException ex) { @@ -94,12 +96,16 @@ public class TestParquetPhysicalPlan { @Override public void resultArrived(QueryResultBatch result) { - System.out.printf("Result batch arrived. Number of records: %d", result.getHeader().getRowCount()); + int rows = result.getHeader().getRowCount(); + System.out.println(String.format("Result batch arrived. Number of records: %d", rows)); + count.addAndGet(rows); if (result.getHeader().getIsLastChunk()) latch.countDown(); + result.release(); } - public void await() throws Exception { + public int await() throws Exception { latch.await(); + return count.get(); } } @Test @@ -111,7 +117,7 @@ public class TestParquetPhysicalPlan { client.connect(); ParquetResultsListener listener = new ParquetResultsListener(); client.runQuery(UserProtos.QueryType.PHYSICAL, Resources.toString(Resources.getResource(fileName),Charsets.UTF_8), listener); - listener.await(); + System.out.println(String.format("Got %d total records.", listener.await())); client.close(); } }
