Updated Branches: refs/heads/master 425147735 -> a88e2620d
DRILL-5 - First kinda working version of plan interpreter. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/a88e2620 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/a88e2620 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/a88e2620 Branch: refs/heads/master Commit: a88e2620d1ea437c3203df82cd8d9f2d56a4427f Parents: 4251477 Author: tdunning <[email protected]> Authored: Tue Oct 16 19:25:03 2012 -0700 Committer: tdunning <[email protected]> Committed: Tue Oct 16 19:25:03 2012 -0700 ---------------------------------------------------------------------- .../main/antlr3/org/apache/drill/plan/ast/Plan.g | 6 + .../org/apache/drill/plan/PhysicalInterpreter.java | 81 +++++++- .../main/java/org/apache/drill/plan/ast/Arg.java | 68 ++++++ .../main/java/org/apache/drill/plan/ast/Plan.java | 59 ++++-- .../plan/physical/operators/ArithmeticOp.java | 167 ++++++++++++++- .../apache/drill/plan/physical/operators/Bind.java | 56 +++++- .../drill/plan/physical/operators/ConstantOp.java | 44 ++++- .../plan/physical/operators/DataListener.java | 26 ++- .../plan/physical/operators/EvalOperator.java | 31 +++- .../drill/plan/physical/operators/Filter.java | 80 +++++++- .../drill/plan/physical/operators/InvalidData.java | 29 ++- .../drill/plan/physical/operators/Operator.java | 121 ++++++++++- .../drill/plan/physical/operators/ScanJson.java | 121 ++++++++++- .../drill/plan/physical/operators/Schema.java | 31 ++- .../plan/physical/operators/SchemaListener.java | 26 ++- .../java/org/apache/drill/plan/ParsePlanTest.java | 4 +- .../plan/physical/operators/ScanJsonTest.java | 35 +++- sandbox/plan-parser/src/test/resources/data1.json | 4 + 18 files changed, 886 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/antlr3/org/apache/drill/plan/ast/Plan.g ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/antlr3/org/apache/drill/plan/ast/Plan.g b/sandbox/plan-parser/src/main/antlr3/org/apache/drill/plan/ast/Plan.g index 65c7172..47afb11 100644 --- a/sandbox/plan-parser/src/main/antlr3/org/apache/drill/plan/ast/Plan.g +++ b/sandbox/plan-parser/src/main/antlr3/org/apache/drill/plan/ast/Plan.g @@ -48,6 +48,7 @@ args returns [List<Arg> r]: a = arg {$r = Lists.newArrayList($a.r);} ( COMMA b = arg returns [Arg r]: s = STRING {$r = Arg.createString($s.text);} + | z = HEX_LONG {$r = Arg.createLong($z.text);} | n = NUMBER {$r = Arg.createNumber($n.text);} | b = BOOLEAN {$r = Arg.createBoolean($b.text);} | s = SYMBOL {$r = Arg.createSymbol($s.text);} @@ -59,6 +60,11 @@ STRING @after { paraphrase.pop(); } : ('"'|'\u201c') ( ~('"' | '\\') | '\\' .)* ('"'|'\u201d') ; +HEX_LONG +@init { paraphrase.push("a binary string"); } +@after { paraphrase.pop(); } + : '0x' ( '0'..'9' | 'A'..'F' | 'a'..'f')+ ; + GETS @init { paraphrase.push(":="); } @after { paraphrase.pop(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java index b09989f..dd25507 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/PhysicalInterpreter.java @@ -1,11 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.drill.plan; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.drill.plan.ast.Arg; +import org.apache.drill.plan.ast.Op; +import org.apache.drill.plan.ast.Plan; +import org.apache.drill.plan.physical.operators.DataListener; +import org.apache.drill.plan.physical.operators.Operator; +import org.apache.drill.plan.physical.operators.OperatorReference; + +import java.lang.reflect.InvocationTargetException; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + /** - * Created with IntelliJ IDEA. - * User: tdunning - * Date: 10/15/12 - * Time: 4:39 PM - * To change this template use File | Settings | File Templates. + * Takes a physical plan and interprets in locally. The goal here is to provide a reference + * semantics, not to provide a high speed evaluation of a query. */ -public class PhysicalInterpreter { +public class PhysicalInterpreter implements DataListener { + private final List<Operator> ops; + + public PhysicalInterpreter(Plan prog) throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + Map<Integer, OperatorReference> bindings = Maps.newHashMap(); + ops = Lists.newArrayList(); + + for (Op op : prog.getStatements()) { + ops.add(Operator.create(op, bindings)); + } + + Iterator<Op> i = prog.getStatements().iterator(); + for (Operator op : ops) { + op.link(i.next(), bindings); + } + + Collection<Arg> outputs = prog.getOutputs(); + for (Arg output : outputs) { + bindings.get(output.asSymbol().getInt()).getOp().addDataListener(this); + } + } + + public void run() throws InterruptedException, ExecutionException { + ExecutorService pool = Executors.newFixedThreadPool(ops.size()); + List<Future<Object>> tasks = pool.invokeAll(ops); + pool.shutdown(); + + for (Future<Object> task : tasks) { + System.out.printf("%s\n", task.get()); + } + } + + + @Override + public void notify(Object r) { + System.out.printf("out = %s\n", r); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ast/Arg.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ast/Arg.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ast/Arg.java index 1890bfd..f7c433a 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ast/Arg.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ast/Arg.java @@ -22,16 +22,44 @@ public class Arg { return new Symbol(Integer.parseInt(percent.trimLeadingFrom(s))); } + public static Arg createLong(String s) { + return new Number(Long.parseLong(s.substring(2), 16)); + } + public static Arg createBoolean(String b) { return new BooleanConstant(Boolean.parseBoolean(b)); } + public Symbol asSymbol() { + return (Symbol) this; + } + + public String asString() { + return ((QuotedString) this).s; + } + public static class QuotedString extends Arg { private String s; public QuotedString(String s) { this.s = quotes.trimFrom(quotes.trimFrom(s)); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof QuotedString)) return false; + + QuotedString that = (QuotedString) o; + + return !(s != null ? !s.equals(that.s) : that.s != null); + + } + + @Override + public int hashCode() { + return s != null ? s.hashCode() : 0; + } } public static class BooleanConstant extends Arg { @@ -40,6 +68,16 @@ public class Arg { public BooleanConstant(boolean b) { v = b; } + + @Override + public boolean equals(Object o) { + return this == o || o instanceof BooleanConstant && v == ((BooleanConstant) o).v; + } + + @Override + public int hashCode() { + return (v ? 1 : 0); + } } public static class Number extends Arg { @@ -48,6 +86,21 @@ public class Arg { public Number(double v) { value = v; } + + public double doubleValue() { + return value; + } + + @Override + public boolean equals(Object o) { + return this == o || o instanceof Number && Double.compare(((Number) o).value, value) == 0; + } + + @Override + public int hashCode() { + long temp = value != +0.0d ? Double.doubleToLongBits(value) : 0L; + return (int) (temp ^ (temp >>> 32)); + } } public static class Symbol extends Arg { @@ -60,5 +113,20 @@ public class Arg { public int getSlot() { return slot; } + + public int getInt() { + return slot; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + return o instanceof Symbol && slot == ((Symbol) o).slot; + } + + @Override + public int hashCode() { + return slot; + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ast/Plan.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ast/Plan.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ast/Plan.java index a0425a9..f0253fd 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ast/Plan.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/ast/Plan.java @@ -1,29 +1,50 @@ package org.apache.drill.plan.ast; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.util.Collection; import java.util.List; +import java.util.Set; /** - * Created with IntelliJ IDEA. User: tdunning Date: 10/12/12 Time: 7:41 PM To change this template - * use File | Settings | File Templates. + * Represents an SSA plan. No meaning is attached to any of the operators, nor is the + * mechanism for connected inputs and outputs defined. */ public class Plan { - private List<Op> statements = Lists.newArrayList(); - - public static Plan create(Op first) { - Plan r = new Plan(); - return r.add(first); - } - - public Plan add(Op next) { - if (next != null) { - statements.add(next); - } - return this; - } - - public List<Op> getStatements() { - return statements; - } + private List<Op> statements = Lists.newArrayList(); + + public static Plan create(Op first) { + Plan r = new Plan(); + return r.add(first); + } + + public Plan add(Op next) { + if (next != null) { + statements.add(next); + } + return this; + } + + public List<Op> getStatements() { + return statements; + } + + /** + * Returns a collection of the outputs in a plan that are never consumed. + * @return A collection of Arg's that are used as outputs but never as inputs. + */ + public Collection<Arg> getOutputs() { + Set<Arg> outputs = Sets.newHashSet(); + + for (Op op : statements) { + outputs.addAll(op.getOutputs()); + } + + for (Op op : statements) { + outputs.removeAll(op.getInputs()); + } + + return outputs; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java index 3a1b235..b994628 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ArithmeticOp.java @@ -1,11 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.drill.plan.physical.operators; +import org.apache.drill.plan.ast.Arg; +import org.apache.drill.plan.ast.Op; + +import java.util.List; +import java.util.Map; + /** - * Created with IntelliJ IDEA. - * User: tdunning - * Date: 10/15/12 - * Time: 5:41 PM - * To change this template use File | Settings | File Templates. + * Operator that works on two numbers. */ -public class ArithmeticOp { +public abstract class ArithmeticOp extends EvalOperator { + public static void define() { + Operator.defineOperator(">", GT.class); + Operator.defineOperator("==", EQUALS.class); + Operator.defineOperator("<", LT.class); + Operator.defineOperator("+", PLUS.class); + Operator.defineOperator("-", MINUS.class); + Operator.defineOperator("*", TIMES.class); + Operator.defineOperator("/", DIVIDE.class); + } + + public EvalOperator left, right; + + public ArithmeticOp(Op op, Map<Integer, OperatorReference> bindings) { + checkArity(op, 2, 1); + + // bind our output + bindings.put(op.getOutputs().get(0).asSymbol().getInt(), new OperatorReference(this, 0)); + } + + @Override + public Object eval(Object data) { + Object x = left.eval(data); + Object y = right.eval(data); + + if (x instanceof Number) { + if (y instanceof Number) { + return eval(((Number) x).doubleValue(), ((Number) y).doubleValue()); + } else { + throw new InvalidData("Expected number but got %s (a %s)", y, y.getClass()); + } + } else { + throw new InvalidData("Expected number but got %s (a %s)", x, x.getClass()); + } + } + + @Override + public void link(Op op, Map<Integer, OperatorReference> bindings) { + checkArity(op, 2, 1); + + List<Arg> in = op.getInputs(); + left = extractOperand(in.get(0), bindings); + right = extractOperand(in.get(1), bindings); + } + + private EvalOperator extractOperand(Arg arg, Map<Integer, ? extends OperatorReference> bindings) { + if (arg instanceof Arg.Number) { + return new ConstantOp(((Arg.Number) arg).doubleValue()); + } else if (arg instanceof Arg.Symbol) { + return (EvalOperator) bindings.get(arg.asSymbol().getInt()).getOp(); + } else { + throw new IllegalArgumentException("Wanted constant or reference to another operator"); + } + } + + public abstract Object eval(double x, double y); + + public static class GT extends ArithmeticOp { + public GT(Op op, Map<Integer, OperatorReference> bindings) { + super(op, bindings); + } + + @Override + public Object eval(double x, double y) { + return x > y; + } + } + + public static class LT extends ArithmeticOp { + public LT(Op op, Map<Integer, OperatorReference> bindings) { + super(op, bindings); + } + + @Override + public Object eval(double x, double y) { + return x < y; + } + } + + public static class EQUALS extends ArithmeticOp { + public EQUALS(Op op, Map<Integer, OperatorReference> bindings) { + super(op, bindings); + } + + @Override + public Object eval(double x, double y) { + return x == y; + } + } + + public static class PLUS extends ArithmeticOp { + public PLUS(Op op, Map<Integer, OperatorReference> bindings) { + super(op, bindings); + } + + @Override + public Object eval(double x, double y) { + return x + y; + } + } + + + public static class MINUS extends ArithmeticOp { + public MINUS(Op op, Map<Integer, OperatorReference> bindings) { + super(op, bindings); + } + + @Override + public Object eval(double x, double y) { + return x - y; + } + } + + + public static class TIMES extends ArithmeticOp { + public TIMES(Op op, Map<Integer, OperatorReference> bindings) { + super(op, bindings); + } + + @Override + public Object eval(double x, double y) { + return x * y; + } + } + + + public static class DIVIDE extends ArithmeticOp { + public DIVIDE(Op op, Map<Integer, OperatorReference> bindings) { + super(op, bindings); + } + + @Override + public Object eval(double x, double y) { + return x / y; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java index 3febb9c..ca99afc 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Bind.java @@ -1,11 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.drill.plan.physical.operators; +import org.apache.drill.plan.ast.Arg; +import org.apache.drill.plan.ast.Op; + +import java.util.List; +import java.util.Map; + /** - * Created with IntelliJ IDEA. - * User: tdunning - * Date: 10/15/12 - * Time: 5:40 PM - * To change this template use File | Settings | File Templates. + * How to lookup a variable in an expression. */ -public class Bind { +public class Bind extends EvalOperator { + private Schema schema = null; + + public static void define() { + Operator.defineOperator("bind", Bind.class); + } + + private String name; + + public Bind(Op op, Map<Integer, OperatorReference> bindings) { + checkArity(op, 2, 1); + List<Arg> out = op.getOutputs(); + bindings.put(out.get(0).asSymbol().getInt(), new OperatorReference(this, 0)); + } + + @Override + public void link(Op op, Map<Integer, OperatorReference> bindings) { + // connect to our inputs + name = op.getInputs().get(0).asString(); + schema = bindings.get(op.getInputs().get(1).asSymbol().getInt()).getOp().getSchema(); + } + + @Override + public Object eval(Object data) { + return schema.get(name, data); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ConstantOp.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ConstantOp.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ConstantOp.java index 97d7cf7..60ed918 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ConstantOp.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ConstantOp.java @@ -1,11 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.drill.plan.physical.operators; +import org.apache.drill.plan.ast.Op; + +import java.util.Map; + /** - * Created with IntelliJ IDEA. - * User: tdunning - * Date: 10/15/12 - * Time: 6:05 PM - * To change this template use File | Settings | File Templates. + * Evaluates to a constant value. */ -public class ConstantOp { +public class ConstantOp extends EvalOperator { + private Object value; + + public ConstantOp(double v) { + value = v; + } + + @Override + public void link(Op op, Map<Integer, OperatorReference> bindings) { + // ignore + } + + @Override + public Object eval(Object data) { + return value; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/DataListener.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/DataListener.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/DataListener.java index 6e706a8..3648454 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/DataListener.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/DataListener.java @@ -1,11 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.drill.plan.physical.operators; /** - * Created with IntelliJ IDEA. - * User: tdunning - * Date: 10/15/12 - * Time: 4:43 PM - * To change this template use File | Settings | File Templates. + * Describes a consumer of data. */ -public class DataListener { +public interface DataListener { + public void notify(Object r); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java index bf6ec22..dfaea7b 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/EvalOperator.java @@ -1,11 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.drill.plan.physical.operators; /** - * Created with IntelliJ IDEA. - * User: tdunning - * Date: 10/15/12 - * Time: 5:32 PM - * To change this template use File | Settings | File Templates. + * Describes a scalar expression. */ -public interface EvalOperator { +public abstract class EvalOperator extends Operator { + public abstract Object eval(Object data); + + @Override + public Schema getSchema() { + throw new UnsupportedOperationException("Can't get schema from expression"); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Filter.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Filter.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Filter.java index 9a77870..c677faf 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Filter.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Filter.java @@ -1,11 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.drill.plan.physical.operators; +import org.apache.drill.plan.ast.Arg; +import org.apache.drill.plan.ast.Op; + +import java.util.List; +import java.util.Map; + /** - * Created with IntelliJ IDEA. - * User: tdunning - * Date: 10/15/12 - * Time: 5:26 PM - * To change this template use File | Settings | File Templates. + * Selects records according to whether an expression evaluated on the records is true or non-zero. */ -public class Filter { +public class Filter extends Operator implements DataListener { + + private Operator data; + + public static void define() { + Operator.defineOperator("filter", Filter.class); + } + + private EvalOperator filterExpression = null; + + public Filter(Op op, Map<Integer, OperatorReference> bindings) { + checkArity(op, 2, 1); + List<Arg> outputs = op.getOutputs(); + if (outputs.size() != 1) { + throw new IllegalArgumentException("filter operator should only have one output"); + } + bindings.put(outputs.get(0).asSymbol().getInt(), new OperatorReference(this, 0)); + } + + @Override + public void link(Op op, Map<Integer, OperatorReference> bindings) { + List<Arg> inputs = op.getInputs(); + if (inputs.size() != 2) { + throw new IllegalArgumentException("filter requires two inputs, a filter-expression and a data source. Got " + inputs.size()); + } + filterExpression = (EvalOperator) bindings.get(inputs.get(0).asSymbol().getInt()).getOp(); + data = bindings.get(inputs.get(1).asSymbol().getInt()).getOp(); + data.addDataListener(this); + } + + @Override + public void notify(Object data) { + Object r = filterExpression.eval(data); + if (r instanceof Number) { + if (((Number) r).doubleValue() != 0) { + emit(data); + } + } else if (r instanceof Boolean) { + if ((Boolean) r) { + emit(data); + } + } else { + throw new InvalidData(String.format("Invalid data type %s wanted number or boolean", data.getClass())); + } + } + + @Override + public Schema getSchema() { + return data.getSchema(); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/InvalidData.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/InvalidData.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/InvalidData.java index 16f22b2..cf66f6e 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/InvalidData.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/InvalidData.java @@ -1,14 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.drill.plan.physical.operators; /** -* Created with IntelliJ IDEA. -* User: tdunning -* Date: 10/15/12 -* Time: 5:39 PM -* To change this template use File | Settings | File Templates. +* Thrown when data of the wrong type is encountered. */ -class InvalidData extends Throwable { +class InvalidData extends RuntimeException { public InvalidData(String msg) { super(msg); } + + public InvalidData(String format, Object... args) { + super(String.format(format, args)); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java index c3ac775..4462508 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Operator.java @@ -1,11 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.drill.plan.physical.operators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.drill.plan.ast.Arg; +import org.apache.drill.plan.ast.Op; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + /** - * Created with IntelliJ IDEA. - * User: tdunning - * Date: 10/15/12 - * Time: 4:41 PM - * To change this template use File | Settings | File Templates. + * Implements a function for an operator on a single line of the physical plan. + * + * The life cycle of an operator is + * <nl> + * <li>The operator's constructor is defined using Operator.defineOperator</li> + * <li>The operator is constructed via Operator.create. It is expected that + * the operator will fill in references to it's own outputs into the DAG bindings</li> + * <li>The operator is linked by a call to its link() method. At this point, the + * operator can look at its arguments and resolve references to its inputs. + * This is when it should add itself as a data listener and when it should request + * any schema that it needs from upstream Operator's.</li> + * <li>The operator's run() method is called. Most operators should simply return at this + * point, but data sources should start calling emit with data records.</li> + * <li>The operator will be notified of incoming data. It should process this data + * and emit the result.</li> + * </nl> */ -public class Operator { +public abstract class Operator implements Callable<Object> { + private static final Map<String, Class<? extends Operator>> operatorMap = Maps.newHashMap(); + + static { + ArithmeticOp.define(); + Bind.define(); + Filter.define(); + ScanJson.define(); + } + + + public static void defineOperator(String name, Class<? extends Operator> clazz) { + if (operatorMap.containsKey(name)) { + throw new RuntimeException(String.format("Duplicate operator name for %s vs %s", clazz, operatorMap.get(name))); + } + operatorMap.put(name, clazz); + } + + public static Operator create(Op op, Map<Integer, OperatorReference> bindings) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException { + Class<? extends Operator> c = operatorMap.get(op.getOp()); + if (c == null) { + throw new IllegalArgumentException(String.format("No such operators as %s", op.getOp())); + } + + Constructor<? extends Operator> con = c.getConstructor(Op.class, Map.class); + return con.newInstance(op, bindings); + } + + protected final List<DataListener> dataOut = Lists.newArrayList(); + protected final List<SchemaListener> schemaOut = Lists.newArrayList(); + + public void addDataListener(DataListener listener) { + this.dataOut.add(listener); + } + + protected void emit(Object r) { + for (DataListener listener : dataOut) { + listener.notify(r); + } + } + + protected void emitSchema(Schema schema) { + for (SchemaListener listener : schemaOut) { + listener.notify(schema); + } + } + + public double eval() { + throw new UnsupportedOperationException("default no can do"); //To change body of created methods use File | Settings | File Templates. + } + + public abstract void link(Op op, Map<Integer, OperatorReference> bindings); + + public Object call() throws Exception { + // do nothing + return null; + } + + public abstract Schema getSchema(); + + protected void checkArity(Op op, int inputArgs, int outputArgs) { + List<Arg> in = op.getInputs(); + if (in.size() != inputArgs) { + throw new IllegalArgumentException("bind should have exactly two arguments (an expression and a data source)"); + } + + List<Arg> out = op.getOutputs(); + if (out.size() != outputArgs) { + throw new IllegalArgumentException("bind should have exactly one output"); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java index 329fc25..1f020ba 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/ScanJson.java @@ -1,11 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.drill.plan.physical.operators; +import com.google.common.base.Charsets; +import com.google.common.base.Splitter; +import com.google.common.io.Files; +import com.google.common.io.InputSupplier; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonStreamParser; +import org.apache.drill.plan.ast.Arg; +import org.apache.drill.plan.ast.Op; + +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.List; +import java.util.Map; + /** - * Created with IntelliJ IDEA. - * User: tdunning - * Date: 10/15/12 - * Time: 4:41 PM - * To change this template use File | Settings | File Templates. + * Reads JSON formatted records from a file. */ -public class ScanJson { +public class ScanJson extends Operator { + public static void define() { + Operator.defineOperator("scan-json", ScanJson.class); + } + + private InputSupplier<InputStreamReader> input; + + public ScanJson(Op op, Map<Integer, OperatorReference> bindings) { + List<Arg> in = op.getInputs(); + if (in.size() != 1) { + throw new IllegalArgumentException("scan-json should have exactly one argument (a file name)"); + } + input = Files.newReaderSupplier(new File(in.get(0).asString()), Charsets.UTF_8); + + List<Arg> out = op.getOutputs(); + if (out.size() != 1) { + throw new IllegalArgumentException("scan-json should have exactly one output"); + } + bindings.put(out.get(0).asSymbol().getInt(), new OperatorReference(this, 0)); + } + + public ScanJson(InputSupplier<InputStreamReader> input) throws IOException { + this.input = input; + } + + public static ScanJson create(InputSupplier<InputStreamReader> input) throws IOException { + return new ScanJson(input); + } + + @Override + public void link(Op next, Map<Integer, OperatorReference> bindings) { + // nothing to look for + } + + @Override + public Object call() throws IOException { + JsonSchema schema = new JsonSchema(); + emitSchema(schema); + + int count = 0; + Reader in = input.getInput(); + JsonStreamParser jp = new JsonStreamParser(in); + while (jp.hasNext()) { + JsonElement r = jp.next(); + emit(r); + count++; + } + in.close(); + return count; + } + + @Override + public Schema getSchema() { + return new JsonSchema(); + } + + private class JsonSchema extends Schema { + Splitter onDot = Splitter.on("."); + + @Override + public Object get(String name, Object data) { + Iterable<String> bits = onDot.split(name); + for (String bit : bits) { + data = ((JsonObject) data).get(bit); + } + if (data instanceof JsonPrimitive) { + JsonPrimitive v = (JsonPrimitive) data; + if (v.isNumber()) { + return v.getAsDouble(); + } else if (v.isString()) { + return v.getAsString(); + } else { + return v.getAsBoolean(); + } + } else { + return data; + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java index 7dab2f1..e38a792 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/Schema.java @@ -1,12 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.drill.plan.physical.operators; /** -* Created with IntelliJ IDEA. -* User: tdunning -* Date: 10/15/12 -* Time: 4:46 PM -* To change this template use File | Settings | File Templates. -*/ -public class Schema { - + * Describes a schema. In this context a schema is what understands how to get data + * out of a record. For JSON, the schema is pretty dumb, but for other data types it + * could be quite clever. + */ +public abstract class Schema { + public abstract Object get(String name, Object data); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/SchemaListener.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/SchemaListener.java b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/SchemaListener.java index 17d1b8b..719c065 100644 --- a/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/SchemaListener.java +++ b/sandbox/plan-parser/src/main/java/org/apache/drill/plan/physical/operators/SchemaListener.java @@ -1,11 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.drill.plan.physical.operators; /** - * Created with IntelliJ IDEA. - * User: tdunning - * Date: 10/15/12 - * Time: 4:45 PM - * To change this template use File | Settings | File Templates. + * Describes an object that accepts updates on schema's. */ -public class SchemaListener { +public interface SchemaListener { + public void notify(Schema s); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java index 32f64df..8ecc7d1 100644 --- a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java +++ b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/ParsePlanTest.java @@ -32,9 +32,7 @@ import java.io.InputStreamReader; import java.util.Iterator; import java.util.List; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; +import static junit.framework.Assert.*; public class ParsePlanTest { @Test http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/test/java/org/apache/drill/plan/physical/operators/ScanJsonTest.java ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/physical/operators/ScanJsonTest.java b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/physical/operators/ScanJsonTest.java index 2f0b1a5..7911a45 100644 --- a/sandbox/plan-parser/src/test/java/org/apache/drill/plan/physical/operators/ScanJsonTest.java +++ b/sandbox/plan-parser/src/test/java/org/apache/drill/plan/physical/operators/ScanJsonTest.java @@ -1,11 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.drill.plan.physical.operators; -/** - * Created with IntelliJ IDEA. - * User: tdunning - * Date: 10/15/12 - * Time: 5:07 PM - * To change this template use File | Settings | File Templates. - */ +import com.google.common.base.Charsets; +import com.google.common.io.Resources; +import org.junit.Test; + +import java.io.IOException; + public class ScanJsonTest { + @Test + public void test1() throws IOException { + ScanJson in = ScanJson.create(Resources.newReaderSupplier(Resources.getResource("data1.json"), Charsets.UTF_8)); + in.call(); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a88e2620/sandbox/plan-parser/src/test/resources/data1.json ---------------------------------------------------------------------- diff --git a/sandbox/plan-parser/src/test/resources/data1.json b/sandbox/plan-parser/src/test/resources/data1.json index e69de29..ddfaa50 100644 --- a/sandbox/plan-parser/src/test/resources/data1.json +++ b/sandbox/plan-parser/src/test/resources/data1.json @@ -0,0 +1,4 @@ +{"a":1, "b":2} +{"a":2, "b":2} +{"a":3, "b":2} +{"a":4, "b":2}
