Add two basic repeated functions: repeated_contains and repeated_count.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/bd41633f Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/bd41633f Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/bd41633f Branch: refs/heads/master Commit: bd41633f1e2f088e0a9ba97378d77f56e2a00cdd Parents: ecb80fc Author: Jacques Nadeau <[email protected]> Authored: Thu Aug 8 12:10:44 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Aug 8 12:10:44 2013 -0700 ---------------------------------------------------------------------- .../expr/fn/impl/SimpleRepeatedFunctions.java | 100 +++++++++++++++++++ .../exec/fn/impl/TestRepeatedFunction.java | 96 ++++++++++++++++++ .../src/test/resources/physical_repeated_1.json | 37 +++++++ 3 files changed, 233 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bd41633f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/SimpleRepeatedFunctions.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/SimpleRepeatedFunctions.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/SimpleRepeatedFunctions.java new file mode 100644 index 0000000..86f0040 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/SimpleRepeatedFunctions.java @@ -0,0 +1,100 @@ +package org.apache.drill.exec.expr.fn.impl; + +import org.apache.drill.common.expression.Arg; +import org.apache.drill.common.expression.ArgumentValidators; +import org.apache.drill.common.expression.BasicArgumentValidator; +import org.apache.drill.common.expression.CallProvider; +import org.apache.drill.common.expression.FunctionDefinition; +import org.apache.drill.common.expression.OutputTypeDeterminer; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.expr.DrillFunc; +import org.apache.drill.exec.expr.annotations.FunctionTemplate; +import org.apache.drill.exec.expr.annotations.Output; +import org.apache.drill.exec.expr.annotations.Param; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.vector.BigIntHolder; +import org.apache.drill.exec.vector.BitHolder; +import org.apache.drill.exec.vector.IntHolder; +import org.apache.drill.exec.vector.RepeatedBigIntHolder; +import org.apache.drill.exec.vector.RepeatedIntHolder; + +public class SimpleRepeatedFunctions { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MathFunctions.class); + + private SimpleRepeatedFunctions() { + } + + @FunctionTemplate(name = "repeated_count", scope = FunctionTemplate.FunctionScope.SIMPLE) + public static class RepeatedLengthBigInt implements DrillFunc { + + @Param + RepeatedBigIntHolder input; + @Output + IntHolder out; + + public void setup(RecordBatch b) { + } + + public void eval() { + out.value = input.end - input.start; + } + } + + @FunctionTemplate(name = "repeated_count", scope = FunctionTemplate.FunctionScope.SIMPLE) + public static class RepeatedLengthInt implements DrillFunc { + + @Param RepeatedIntHolder input; + @Output IntHolder out; + + public void setup(RecordBatch b) { + } + + public void eval() { + out.value = input.end - input.start; + } + } + + @FunctionTemplate(name = "repeated_contains", scope = FunctionTemplate.FunctionScope.SIMPLE) + public static class ContainsBigInt implements DrillFunc { + + @Param RepeatedBigIntHolder listToSearch; + @Param BigIntHolder targetValue; + @Output BitHolder out; + + public void setup(RecordBatch b) { + } + + public void eval() { + for (int i = listToSearch.start; i < listToSearch.end; i++) { + if (listToSearch.vector.getAccessor().get(i) == targetValue.value) { + out.value = 1; + break; + } + } + } + + } + + public static class Provider implements CallProvider { + + @Override + public FunctionDefinition[] getFunctionDefintions() { + return new FunctionDefinition[] { + FunctionDefinition.simple("repeated_contains", new BasicArgumentValidator( // + new Arg("repeatedToSearch", // + Types.repeated(MinorType.BIGINT), // + Types.repeated(MinorType.INT)), // + new Arg("targetValue", Types.required(MinorType.BIGINT))), // + OutputTypeDeterminer.FixedType.FIXED_BIT), + + FunctionDefinition.simple( + "repeated_count", + new BasicArgumentValidator(new Arg("repeatedToSearch", Types.repeated(MinorType.BIGINT), Types + .repeated(MinorType.INT))), new OutputTypeDeterminer.FixedType(Types.required(MinorType.INT))) + + }; + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bd41633f/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java new file mode 100644 index 0000000..1c51919 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java @@ -0,0 +1,96 @@ +package org.apache.drill.exec.fn.impl; + +import static org.junit.Assert.*; +import mockit.Injectable; +import mockit.NonStrictExpectations; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.physical.impl.ImplCreator; +import org.apache.drill.exec.physical.impl.SimpleRootExec; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.vector.BitVector; +import org.apache.drill.exec.vector.IntVector; +import org.junit.AfterClass; +import org.junit.Test; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import com.yammer.metrics.MetricRegistry; + +public class TestRepeatedFunction { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestRepeatedFunction.class); + DrillConfig c = DrillConfig.create(); + + + @Test + public void testRepeated(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable{ +// System.out.println(System.getProperty("java.class.path")); + + + new NonStrictExpectations(){{ + bitContext.getMetrics(); result = new MetricRegistry("test"); + bitContext.getAllocator(); result = BufferAllocator.getAllocator(c); + }}; + + + PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/physical_repeated_1.json"), Charsets.UTF_8)); + FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c); + FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry); + SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next())); + + boolean oneIsOne = false; + int size = 0; + int[] sizes = {1,2,0,6}; + + while(exec.next()){ + IntVector c1 = exec.getValueVectorById(new SchemaPath("cnt", ExpressionPosition.UNKNOWN), IntVector.class); + BitVector c2 = exec.getValueVectorById(new SchemaPath("has_min", ExpressionPosition.UNKNOWN), BitVector.class); + + for(int i =0; i < exec.getRecordCount(); i++){ + int curSize = sizes[size % sizes.length]; + assertEquals(curSize, c1.getAccessor().get(i)); + switch(curSize){ + case 1: + assertEquals(oneIsOne, 1 == c2.getAccessor().get(i)); + oneIsOne = !oneIsOne; + break; + case 2: + assertEquals(1, c2.getAccessor().get(i)); + break; + case 0: + assertEquals(0, c2.getAccessor().get(i)); + break; + case 6: + assertEquals(1, c2.getAccessor().get(i)); + break; + } + size++; + } + } + + if(context.getFailureCause() != null){ + throw context.getFailureCause(); + } + assertTrue(!context.isFailed()); + + } + + @AfterClass + public static void tearDown() throws Exception{ + // pause to get logger to catch up. + Thread.sleep(1000); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bd41633f/sandbox/prototype/exec/java-exec/src/test/resources/physical_repeated_1.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_repeated_1.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_repeated_1.json new file mode 100644 index 0000000..c26be01 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_repeated_1.json @@ -0,0 +1,37 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-scan", + url: "http://apache.org", + entries:[ + {records: 100, types: [ + {name: "blue", type: "INT", mode: "REPEATED"}, + {name: "red", type: "BIGINT", mode: "REPEATED"}, + {name: "green", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id:2, + child: 1, + pop:"project", + exprs: [ + { ref: "cnt", expr:"repeated_count(blue)" }, + { ref: "has_min", expr:"repeated_contains(red, 9223372036854775807)" } + ] + }, + { + @id: 3, + child: 2, + pop: "screen" + } + ] +} \ No newline at end of file
