in StreamingAggregate, don't sort if there are no keys
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5f4f3bd1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5f4f3bd1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5f4f3bd1 Branch: refs/heads/master Commit: 5f4f3bd1b94ad265a363146adcb884e9473a84fa Parents: 75351da Author: Steven Phillips <[email protected]> Authored: Tue Sep 3 11:33:27 2013 -0700 Committer: Steven Phillips <[email protected]> Committed: Tue Sep 3 17:34:29 2013 -0700 ---------------------------------------------------------------------- .../drill/exec/client/QuerySubmitter.java | 64 +++++++++++--------- .../apache/drill/exec/opt/BasicOptimizer.java | 9 ++- .../org/apache/drill/jdbc/DrillHandler.java | 1 + 3 files changed, 45 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5f4f3bd1/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java index f74d14b..4e4d32e 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java @@ -4,6 +4,7 @@ import com.beust.jcommander.internal.Lists; import com.google.common.base.Charsets; import com.google.common.base.Stopwatch; import com.google.common.io.Resources; +import org.apache.commons.lang.StringUtils; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.coord.ZKClusterCoordinator; import org.apache.drill.exec.exception.SchemaChangeException; @@ -58,13 +59,15 @@ public class QuerySubmitter { Stopwatch watch = new Stopwatch(); watch.start(); client.runQuery(queryType, plan, listener); - System.out.println(String.format("Got %d total records in %f seconds", listener.await(), (float)watch.elapsed(TimeUnit.MILLISECONDS) / (float)1000)); + int rows = listener.await(); + System.out.println(String.format("Got %d record%s in %f seconds", rows, rows > 1 ? "s" : "", (float)watch.elapsed(TimeUnit.MILLISECONDS) / (float)1000)); return 0; } private class QueryResultsListener implements UserResultsListener { AtomicInteger count = new AtomicInteger(); private CountDownLatch latch = new CountDownLatch(1); + RecordBatchLoader loader = new RecordBatchLoader(new BootStrapContext(DrillConfig.create()).getAllocator()); @Override public void submissionFailed(RpcException ex) { System.out.println(String.format("Query failed: %s", ex)); @@ -74,37 +77,44 @@ public class QuerySubmitter { @Override public void resultArrived(QueryResultBatch result) { int rows = result.getHeader().getRowCount(); - RecordBatchLoader loader = new RecordBatchLoader(new BootStrapContext(DrillConfig.create()).getAllocator()); - try { - loader.load(result.getHeader().getDef(), result.getData()); - } catch (SchemaChangeException e) { - submissionFailed(new RpcException(e)); - } - List<String> columns = Lists.newArrayList(); - for (VectorWrapper vw : loader) { - columns.add(vw.getValueVector().getField().getName()); - } - for (int row = 0; row < rows; row++) { - if (row%50 == 0) { - for (String column : columns) { - System.out.printf("| %-15s", column.length() <= 15 ? column : column.substring(0, 14)); - } - System.out.printf("|\n"); + if (result.getData() != null) { + try { + loader.load(result.getHeader().getDef(), result.getData()); + } catch (SchemaChangeException e) { + submissionFailed(new RpcException(e)); } + List<String> columns = Lists.newArrayList(); for (VectorWrapper vw : loader) { - Object o = vw.getValueVector().getAccessor().getObject(row); - if (o instanceof byte[]) { - String value = new String((byte[]) o); - System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); - } else { - String value = o.toString(); - System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); + columns.add(vw.getValueVector().getField().getName()); + } + for (int row = 0; row < rows; row++) { + if (row%50 == 0) { + System.out.println(StringUtils.repeat("-", columns.size()*17 + 1)); + for (String column : columns) { + System.out.printf("| %-15s", column.length() <= 15 ? column : column.substring(0, 14)); + } + System.out.printf("|\n"); + System.out.println(StringUtils.repeat("-", columns.size()*17 + 1)); + } + for (VectorWrapper vw : loader) { + Object o = vw.getValueVector().getAccessor().getObject(row); + if (o instanceof byte[]) { + String value = new String((byte[]) o); + count.addAndGet(1); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); + } else { + String value = o.toString(); + count.addAndGet(1); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); + } } + System.out.printf("|\n"); } - System.out.printf("|\n"); } - count.addAndGet(rows); - if (result.getHeader().getIsLastChunk()) latch.countDown(); + if (result.getHeader().getIsLastChunk()) { +// System.out.println(StringUtils.repeat("-", columns.size()*17 + 1)); + latch.countDown(); + } result.release(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5f4f3bd1/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java index 9f54dd0..697495b 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java @@ -147,8 +147,13 @@ public class BasicOptimizer extends Optimizer{ orderDefs.add(new OrderDef(Direction.ASC, e)); } Sort sort = new Sort(segment.getInput().accept(this, value), orderDefs, false); - - StreamingAggregate sa = new StreamingAggregate(sort, keys.toArray(new NamedExpression[keys.size()]), agg.getAggregations(), 1.0f); + + StreamingAggregate sa; + if (keys.size() == 0) { + sa = new StreamingAggregate(segment.getInput().accept(this, value), new NamedExpression[0], agg.getAggregations(), 1.0f); + } else { + sa = new StreamingAggregate(sort, keys.toArray(new NamedExpression[keys.size()]), agg.getAggregations(), 1.0f); + } SelectionVectorRemover svm = new SelectionVectorRemover(sa); return svm; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5f4f3bd1/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java index be5ab9c..bd88641 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java @@ -109,6 +109,7 @@ public class DrillHandler extends HandlerImpl { } catch (Exception ex) { System.out.println(ex); + logger.error("Failure while setting up jdbc handler", ex); throw new SQLException("Failure trying to connect to Drill.", ex); } }
