Repository: incubator-drill Updated Branches: refs/heads/master 08a9a90d9 -> 2d10afd4b
DRILL-396: QuerySubmitter enhancements Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2ed7a678 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2ed7a678 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2ed7a678 Branch: refs/heads/master Commit: 2ed7a678b1caef67279a775d7e8b217e3e18041f Parents: 08a9a90 Author: Steven Phillips <[email protected]> Authored: Tue Mar 11 08:26:13 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Tue Mar 11 08:26:13 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/client/QuerySubmitter.java | 101 +++++++++++++++---- .../org/apache/drill/exec/util/VectorUtil.java | 34 +++++++ 2 files changed, 116 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ed7a678/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java index b07b3ae..0253069 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java @@ -20,10 +20,13 @@ package org.apache.drill.exec.client; import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.coord.ZKClusterCoordinator; import org.apache.drill.exec.exception.SchemaChangeException; @@ -64,33 +67,47 @@ public class QuerySubmitter { jc.usage(); System.exit(0); } - System.exit(submitter.submitQuery(o.location, o.planType, o.zk, o.local, o.bits)); + + System.exit(submitter.submitQuery(o.location, o.queryString, o.planType, o.zk, o.local, o.bits, o.format)); } static class Options { - @Parameter(names = {"-f"}, description = "file containing plan", required=true) + @Parameter(names = {"-f, --file"}, description = "file containing plan", required=false) public String location = null; - @Parameter(names = {"-t"}, description = "type of plan, logical/physical", required=true) + @Parameter(names = {"-q", "-e", "--query"}, description = "query string", required = false) + public String queryString = null; + + @Parameter(names = {"-t", "--type"}, description = "type of query, sql/logical/physical", required=true) public String planType; - @Parameter(names = {"-zk"}, description = "zookeeper connect string.", required=false) + @Parameter(names = {"-z", "--zookeeper"}, description = "zookeeper connect string.", required=false) public String zk = "localhost:2181"; - @Parameter(names = {"-local"}, description = "run query in local mode", required=false) + @Parameter(names = {"-l", "--local"}, description = "run query in local mode", required=false) public boolean local; - @Parameter(names = "-bits", description = "number of drillbits to run. local mode only", required=false) + @Parameter(names = {"-b", "--bits"}, description = "number of drillbits to run. local mode only", required=false) public int bits = 1; - @Parameter(names = {"-h", "-help", "--help"}, description = "show usage", help=true) + @Parameter(names = {"-h", "--help"}, description = "show usage", help=true) public boolean help = false; + + @Parameter(names = {"--format"}, description = "output format, csv,tsv,table", required = false) + public String format = "table"; } - public int submitQuery(String planLocation, String type, String zkQuorum, boolean local, int bits) throws Exception { + public enum Format { + TSV, CSV, TABLE + } + + public int submitQuery(String planLocation, String queryString, String type, String zkQuorum, boolean local, int bits, String format) throws Exception { DrillConfig config = DrillConfig.create(); DrillClient client = null; - + + Preconditions.checkArgument(!(planLocation == null && queryString == null), "Must provide either query file or query string"); + Preconditions.checkArgument(!(planLocation != null && queryString != null), "Must provide either query file or query string, not both"); + try{ if (local) { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); @@ -106,29 +123,62 @@ public class QuerySubmitter { client = new DrillClient(config, clusterCoordinator); } client.connect(); - QueryResultsListener listener = new QueryResultsListener(); - String plan = Charsets.UTF_8.decode(ByteBuffer.wrap(Files.readAllBytes(Paths.get(planLocation)))).toString(); + QueryResultsListener listener; + String plan; + if (queryString == null) { + plan = Charsets.UTF_8.decode(ByteBuffer.wrap(Files.readAllBytes(Paths.get(planLocation)))).toString(); + } else { + plan = queryString; + } + String[] queries; UserProtos.QueryType queryType; type = type.toLowerCase(); switch(type) { case "sql": queryType = UserProtos.QueryType.SQL; + queries = plan.split(";"); break; case "logical": queryType = UserProtos.QueryType.LOGICAL; + queries = new String[]{ plan }; break; case "physical": queryType = UserProtos.QueryType.PHYSICAL; + queries = new String[]{ plan }; break; default: System.out.println("Invalid query type: " + type); return -1; } + Format outputFormat; + format = format.toLowerCase(); + switch(format) { + case "csv": + outputFormat = Format.CSV; + break; + case "tsv": + outputFormat = Format.TSV; + break; + case "table": + outputFormat = Format.TABLE; + break; + default: + System.out.println("Invalid format type: " + format); + return -1; + } Stopwatch watch = new Stopwatch(); - watch.start(); - client.runQuery(queryType, plan, listener); - int rows = listener.await(); - System.out.println(String.format("Got %d record%s in %f seconds", rows, rows > 1 ? "s" : "", (float)watch.elapsed(TimeUnit.MILLISECONDS) / (float)1000)); + for (String query : queries) { + listener = new QueryResultsListener(outputFormat); + watch.start(); + client.runQuery(queryType, query, listener); + int rows = listener.await(); + System.out.println(String.format("%d record%s selected (%f seconds)", rows, rows > 1 ? "s" : "", (float) watch.elapsed(TimeUnit.MILLISECONDS) / (float) 1000)); + if (query != queries[queries.length - 1]) { + System.out.println(); + } + watch.stop(); + watch.reset(); + } return 0; }finally{ if(client != null) client.close(); @@ -139,7 +189,11 @@ public class QuerySubmitter { AtomicInteger count = new AtomicInteger(); private CountDownLatch latch = new CountDownLatch(1); RecordBatchLoader loader = new RecordBatchLoader(new BootStrapContext(DrillConfig.create()).getAllocator()); - int width; + Format format; + + public QueryResultsListener(Format format) { + this.format = format; + } @Override public void submissionFailed(RpcException ex) { @@ -157,12 +211,21 @@ public class QuerySubmitter { } catch (SchemaChangeException e) { submissionFailed(new RpcException(e)); } - - VectorUtil.showVectorAccessibleContent(loader); + + switch(format) { + case TABLE: + VectorUtil.showVectorAccessibleContent(loader); + break; + case TSV: + VectorUtil.showVectorAccessibleContent(loader, "\t"); + break; + case CSV: + VectorUtil.showVectorAccessibleContent(loader, ","); + break; + } } if (result.getHeader().getIsLastChunk()) { - //System.out.println(StringUtils.repeat("-", width*17 + 1)); latch.countDown(); } result.release(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ed7a678/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java index 553e50d..a35bb5f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java @@ -29,6 +29,40 @@ import com.beust.jcommander.internal.Lists; public class VectorUtil { + public static void showVectorAccessibleContent(VectorAccessible va, final String delimiter) { + + int rows = va.getRecordCount(); + List<String> columns = Lists.newArrayList(); + for (VectorWrapper vw : va) { + columns.add(vw.getValueVector().getField().getName()); + } + + int width = columns.size(); + for (String column : columns) { + System.out.printf("%s%s",column, column == columns.get(width - 1) ? "\n" : delimiter); + } + for (int row = 0; row < rows; row++) { + int columnCounter = 0; + for (VectorWrapper vw : va) { + boolean lastColumn = columnCounter == width - 1; + Object o = vw.getValueVector().getAccessor().getObject(row); + if (o == null) { + //null value + String value = "null"; + System.out.printf("%s%s", value, lastColumn ? "\n" : delimiter); + } + else if (o instanceof byte[]) { + String value = new String((byte[]) o); + System.out.printf("%s%s", value, lastColumn ? "\n" : delimiter); + } else { + String value = o.toString(); + System.out.printf("%s%s", value, lastColumn ? "\n" : delimiter); + } + columnCounter++; + } + } + } + public static void showVectorAccessibleContent(VectorAccessible va) { int rows = va.getRecordCount();
