http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java index 8a24add..43a8618 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java @@ -48,7 +48,7 @@ public class LogicalPlanEqualityTester implements LogicalPlanRewriteRule { public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException { LogicalNode root = plan.getRootBlock().getRoot(); PlanProto.LogicalNodeTree serialized = LogicalNodeSerializer.serialize(plan.getRootBlock().getRoot()); - LogicalNode deserialized = LogicalNodeDeserializer.deserialize(queryContext, serialized); + LogicalNode deserialized = LogicalNodeDeserializer.deserialize(queryContext, null, serialized); assert root.deepEquals(deserialized); return plan; }
http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index c49d51b..d8b6380 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -86,7 +86,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { public PartitionPathFilter(Schema schema, EvalNode partitionFilter) { this.schema = schema; this.partitionFilter = partitionFilter; - partitionFilter.bind(schema); + partitionFilter.bind(null, schema); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java index 5a96054..3ca76ee 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java @@ -35,10 +35,11 @@ import org.apache.tajo.datum.*; import org.apache.tajo.exception.InternalException; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.function.AggFunction; -import org.apache.tajo.plan.function.GeneralFunction; +import org.apache.tajo.plan.function.python.PythonScriptEngine; import org.apache.tajo.plan.logical.WindowSpec; import org.apache.tajo.plan.serder.PlanProto.WinFunctionEvalSpec; +import java.io.IOException; import java.util.*; /** @@ -52,7 +53,7 @@ import java.util.*; */ public class EvalNodeDeserializer { - public static EvalNode deserialize(OverridableConf context, PlanProto.EvalNodeTree tree) { + public static EvalNode deserialize(OverridableConf context, EvalContext evalContext, PlanProto.EvalNodeTree tree) { Map<Integer, EvalNode> evalNodeMap = Maps.newHashMap(); // sort serialized eval nodes in an ascending order of their IDs. @@ -180,9 +181,10 @@ public class EvalNodeDeserializer { try { funcDesc = new FunctionDesc(funcProto.getFuncion()); if (type == EvalType.FUNCTION) { - GeneralFunction instance = (GeneralFunction) funcDesc.newInstance(); - current = new GeneralFunctionEval(context, new FunctionDesc(funcProto.getFuncion()), instance, params); - + current = new GeneralFunctionEval(context, funcDesc, params); + if (evalContext != null && funcDesc.getInvocation().hasPython()) { + evalContext.addScriptEngine(current, new PythonScriptEngine(funcDesc)); + } } else if (type == EvalType.AGG_FUNCTION || type == EvalType.WINDOW_FUNCTION) { AggFunction instance = (AggFunction) funcDesc.newInstance(); if (type == EvalType.AGG_FUNCTION) { @@ -228,6 +230,8 @@ public class EvalNodeDeserializer { throw new NoSuchFunctionException(functionName, parameterTypes); } catch (InternalException ie) { throw new NoSuchFunctionException(funcDesc.getFunctionName(), funcDesc.getParamTypes()); + } catch (IOException e) { + throw new NoSuchFunctionException(e.getMessage()); } } else { throw new RuntimeException("Unknown EvalType: " + type.name()); @@ -309,6 +313,8 @@ public class EvalNodeDeserializer { return new IntervalDatum(datum.getInterval().getMonth(), datum.getInterval().getMsec()); case NULL_TYPE: return NullDatum.get(); + case ANY: + return DatumFactory.createAny(deserialize(datum.getActual())); default: throw new RuntimeException("Unknown data type: " + datum.getType().name()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java index c7702c5..e47d620 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java @@ -24,6 +24,7 @@ import com.google.protobuf.ByteString; import org.apache.tajo.algebra.WindowSpec.WindowFrameEndBoundType; import org.apache.tajo.algebra.WindowSpec.WindowFrameStartBoundType; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.datum.AnyDatum; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.plan.expr.*; @@ -181,6 +182,7 @@ public class EvalNodeSerializer return rowConst; } + @Override public EvalNode visitField(EvalTreeProtoBuilderContext context, Stack<EvalNode> stack, FieldEval field) { PlanProto.EvalNode.Builder builder = createEvalBuilder(context, field); builder.setField(field.getColumnRef().getProto()); @@ -188,6 +190,7 @@ public class EvalNodeSerializer return field; } + @Override public EvalNode visitBetween(EvalTreeProtoBuilderContext context, BetweenPredicateEval between, Stack<EvalNode> stack) { // visiting and registering childs @@ -211,6 +214,7 @@ public class EvalNodeSerializer return between; } + @Override public EvalNode visitCaseWhen(EvalTreeProtoBuilderContext context, CaseWhenEval caseWhen, Stack<EvalNode> stack) { // visiting and registering childs super.visitCaseWhen(context, caseWhen, stack); @@ -235,6 +239,7 @@ public class EvalNodeSerializer return caseWhen; } + @Override public EvalNode visitIfThen(EvalTreeProtoBuilderContext context, CaseWhenEval.IfThenEval ifCond, Stack<EvalNode> stack) { // visiting and registering childs @@ -254,6 +259,7 @@ public class EvalNodeSerializer return ifCond; } + @Override public EvalNode visitFuncCall(EvalTreeProtoBuilderContext context, FunctionEval function, Stack<EvalNode> stack) { // visiting and registering childs super.visitFuncCall(context, function, stack); @@ -297,7 +303,6 @@ public class EvalNodeSerializer builder.setWinFunction(windowFuncBuilder); } - context.treeBuilder.addNodes(builder); return function; } @@ -388,6 +393,9 @@ public class EvalNodeSerializer intervalBuilder.setMsec(interval.getMilliSeconds()); builder.setInterval(intervalBuilder); break; + case ANY: + builder.setActual(serialize(((AnyDatum)datum).getActual())); + break; default: throw new RuntimeException("Unknown data type: " + datum.type().name()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java index f96626b..84991bb 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import org.apache.hadoop.fs.Path; import org.apache.tajo.OverridableConf; import org.apache.tajo.algebra.JoinType; +import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; @@ -31,10 +32,7 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.plan.Target; -import org.apache.tajo.plan.expr.AggregationFunctionCallEval; -import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.plan.expr.FieldEval; -import org.apache.tajo.plan.expr.WindowFunctionEval; +import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; @@ -58,7 +56,8 @@ public class LogicalNodeDeserializer { * @param tree LogicalNodeTree which contains a list of serialized logical nodes. * @return A logical node tree */ - public static LogicalNode deserialize(OverridableConf context, PlanProto.LogicalNodeTree tree) { + public static LogicalNode deserialize(OverridableConf context, @Nullable EvalContext evalContext, + PlanProto.LogicalNodeTree tree) { Map<Integer, LogicalNode> nodeMap = Maps.newHashMap(); // sort serialized logical nodes in an ascending order of their sids @@ -88,10 +87,10 @@ public class LogicalNodeDeserializer { current = convertSetSession(protoNode); break; case EXPRS: - current = convertEvalExpr(context, protoNode); + current = convertEvalExpr(context, evalContext, protoNode); break; case PROJECTION: - current = convertProjection(context, nodeMap, protoNode); + current = convertProjection(context, evalContext, nodeMap, protoNode); break; case LIMIT: current = convertLimit(nodeMap, protoNode); @@ -100,34 +99,34 @@ public class LogicalNodeDeserializer { current = convertSort(nodeMap, protoNode); break; case WINDOW_AGG: - current = convertWindowAgg(context, nodeMap, protoNode); + current = convertWindowAgg(context, evalContext, nodeMap, protoNode); break; case HAVING: - current = convertHaving(context, nodeMap, protoNode); + current = convertHaving(context, evalContext, nodeMap, protoNode); break; case GROUP_BY: - current = convertGroupby(context, nodeMap, protoNode); + current = convertGroupby(context, evalContext, nodeMap, protoNode); break; case DISTINCT_GROUP_BY: - current = convertDistinctGroupby(context, nodeMap, protoNode); + current = convertDistinctGroupby(context, evalContext, nodeMap, protoNode); break; case SELECTION: - current = convertFilter(context, nodeMap, protoNode); + current = convertFilter(context, evalContext, nodeMap, protoNode); break; case JOIN: - current = convertJoin(context, nodeMap, protoNode); + current = convertJoin(context, evalContext, nodeMap, protoNode); break; case TABLE_SUBQUERY: - current = convertTableSubQuery(context, nodeMap, protoNode); + current = convertTableSubQuery(context, evalContext, nodeMap, protoNode); break; case UNION: current = convertUnion(nodeMap, protoNode); break; case PARTITIONS_SCAN: - current = convertPartitionScan(context, protoNode); + current = convertPartitionScan(context, evalContext, protoNode); break; case SCAN: - current = convertScan(context, protoNode); + current = convertScan(context, evalContext, protoNode); break; case CREATE_TABLE: @@ -192,22 +191,25 @@ public class LogicalNodeDeserializer { return setSession; } - private static EvalExprNode convertEvalExpr(OverridableConf context, PlanProto.LogicalNode protoNode) { + private static EvalExprNode convertEvalExpr(OverridableConf context, EvalContext evalContext, + PlanProto.LogicalNode protoNode) { PlanProto.EvalExprNode evalExprProto = protoNode.getExprEval(); EvalExprNode evalExpr = new EvalExprNode(protoNode.getNodeId()); evalExpr.setInSchema(convertSchema(protoNode.getInSchema())); - evalExpr.setTargets(convertTargets(context, evalExprProto.getTargetsList())); + evalExpr.setTargets(convertTargets(context, evalContext, evalExprProto.getTargetsList())); return evalExpr; } - private static ProjectionNode convertProjection(OverridableConf context, Map<Integer, LogicalNode> nodeMap, - PlanProto.LogicalNode protoNode) { + private static ProjectionNode convertProjection(OverridableConf context, EvalContext evalContext, + Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { PlanProto.ProjectionNode projectionProto = protoNode.getProjection(); ProjectionNode projectionNode = new ProjectionNode(protoNode.getNodeId()); - projectionNode.init(projectionProto.getDistinct(), convertTargets(context, projectionProto.getTargetsList())); + projectionNode.init(projectionProto.getDistinct(), convertTargets(context, evalContext, + projectionProto.getTargetsList())); projectionNode.setChild(nodeMap.get(projectionProto.getChildSeq())); projectionNode.setInSchema(convertSchema(protoNode.getInSchema())); projectionNode.setOutSchema(convertSchema(protoNode.getOutSchema())); @@ -239,21 +241,22 @@ public class LogicalNodeDeserializer { return sortNode; } - private static HavingNode convertHaving(OverridableConf context, Map<Integer, LogicalNode> nodeMap, - PlanProto.LogicalNode protoNode) { + private static HavingNode convertHaving(OverridableConf context, EvalContext evalContext, + Map<Integer, LogicalNode> nodeMap, PlanProto.LogicalNode protoNode) { PlanProto.FilterNode havingProto = protoNode.getFilter(); HavingNode having = new HavingNode(protoNode.getNodeId()); having.setChild(nodeMap.get(havingProto.getChildSeq())); - having.setQual(EvalNodeDeserializer.deserialize(context, havingProto.getQual())); + having.setQual(EvalNodeDeserializer.deserialize(context, evalContext, havingProto.getQual())); having.setInSchema(convertSchema(protoNode.getInSchema())); having.setOutSchema(convertSchema(protoNode.getOutSchema())); return having; } - private static WindowAggNode convertWindowAgg(OverridableConf context, Map<Integer, LogicalNode> nodeMap, - PlanProto.LogicalNode protoNode) { + private static WindowAggNode convertWindowAgg(OverridableConf context, EvalContext evalContext, + Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { PlanProto.WindowAggNode windowAggProto = protoNode.getWindowAgg(); WindowAggNode windowAgg = new WindowAggNode(protoNode.getNodeId()); @@ -264,7 +267,8 @@ public class LogicalNodeDeserializer { } if (windowAggProto.getWindowFunctionsCount() > 0) { - windowAgg.setWindowFunctions(convertWindowFunccEvals(context, windowAggProto.getWindowFunctionsList())); + windowAgg.setWindowFunctions(convertWindowFunccEvals(context, evalContext, + windowAggProto.getWindowFunctionsList())); } windowAgg.setDistinct(windowAggProto.getDistinct()); @@ -274,7 +278,7 @@ public class LogicalNodeDeserializer { } if (windowAggProto.getTargetsCount() > 0) { - windowAgg.setTargets(convertTargets(context, windowAggProto.getTargetsList())); + windowAgg.setTargets(convertTargets(context, evalContext, windowAggProto.getTargetsList())); } windowAgg.setInSchema(convertSchema(protoNode.getInSchema())); @@ -283,8 +287,8 @@ public class LogicalNodeDeserializer { return windowAgg; } - private static GroupbyNode convertGroupby(OverridableConf context, Map<Integer, LogicalNode> nodeMap, - PlanProto.LogicalNode protoNode) { + private static GroupbyNode convertGroupby(OverridableConf context, EvalContext evalContext, + Map<Integer, LogicalNode> nodeMap, PlanProto.LogicalNode protoNode) { PlanProto.GroupbyNode groupbyProto = protoNode.getGroupby(); GroupbyNode groupby = new GroupbyNode(protoNode.getNodeId()); @@ -295,10 +299,10 @@ public class LogicalNodeDeserializer { groupby.setGroupingColumns(convertColumns(groupbyProto.getGroupingKeysList())); } if (groupbyProto.getAggFunctionsCount() > 0) { - groupby.setAggFunctions(convertAggFuncCallEvals(context, groupbyProto.getAggFunctionsList())); + groupby.setAggFunctions(convertAggFuncCallEvals(context, evalContext, groupbyProto.getAggFunctionsList())); } if (groupbyProto.getTargetsCount() > 0) { - groupby.setTargets(convertTargets(context, groupbyProto.getTargetsList())); + groupby.setTargets(convertTargets(context, evalContext, groupbyProto.getTargetsList())); } groupby.setInSchema(convertSchema(protoNode.getInSchema())); @@ -307,21 +311,23 @@ public class LogicalNodeDeserializer { return groupby; } - private static DistinctGroupbyNode convertDistinctGroupby(OverridableConf context, Map<Integer, LogicalNode> nodeMap, - PlanProto.LogicalNode protoNode) { + private static DistinctGroupbyNode convertDistinctGroupby(OverridableConf context, EvalContext evalContext, + Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { PlanProto.DistinctGroupbyNode distinctGroupbyProto = protoNode.getDistinctGroupby(); DistinctGroupbyNode distinctGroupby = new DistinctGroupbyNode(protoNode.getNodeId()); distinctGroupby.setChild(nodeMap.get(distinctGroupbyProto.getChildSeq())); if (distinctGroupbyProto.hasGroupbyNode()) { - distinctGroupby.setGroupbyPlan(convertGroupby(context, nodeMap, distinctGroupbyProto.getGroupbyNode())); + distinctGroupby.setGroupbyPlan(convertGroupby(context, evalContext, nodeMap, + distinctGroupbyProto.getGroupbyNode())); } if (distinctGroupbyProto.getSubPlansCount() > 0) { List<GroupbyNode> subPlans = TUtil.newList(); for (int i = 0; i < distinctGroupbyProto.getSubPlansCount(); i++) { - subPlans.add(convertGroupby(context, nodeMap, distinctGroupbyProto.getSubPlans(i))); + subPlans.add(convertGroupby(context, evalContext, nodeMap, distinctGroupbyProto.getSubPlans(i))); } distinctGroupby.setSubPlans(subPlans); } @@ -330,10 +336,11 @@ public class LogicalNodeDeserializer { distinctGroupby.setGroupingColumns(convertColumns(distinctGroupbyProto.getGroupingKeysList())); } if (distinctGroupbyProto.getAggFunctionsCount() > 0) { - distinctGroupby.setAggFunctions(convertAggFuncCallEvals(context, distinctGroupbyProto.getAggFunctionsList())); + distinctGroupby.setAggFunctions(convertAggFuncCallEvals(context, evalContext, + distinctGroupbyProto.getAggFunctionsList())); } if (distinctGroupbyProto.getTargetsCount() > 0) { - distinctGroupby.setTargets(convertTargets(context, distinctGroupbyProto.getTargetsList())); + distinctGroupby.setTargets(convertTargets(context, evalContext, distinctGroupbyProto.getTargetsList())); } int [] resultColumnIds = new int[distinctGroupbyProto.getResultIdCount()]; for (int i = 0; i < distinctGroupbyProto.getResultIdCount(); i++) { @@ -348,8 +355,8 @@ public class LogicalNodeDeserializer { return distinctGroupby; } - private static JoinNode convertJoin(OverridableConf context, Map<Integer, LogicalNode> nodeMap, - PlanProto.LogicalNode protoNode) { + private static JoinNode convertJoin(OverridableConf context, EvalContext evalContext, + Map<Integer, LogicalNode> nodeMap, PlanProto.LogicalNode protoNode) { PlanProto.JoinNode joinProto = protoNode.getJoin(); JoinNode join = new JoinNode(protoNode.getNodeId()); @@ -359,24 +366,24 @@ public class LogicalNodeDeserializer { join.setInSchema(convertSchema(protoNode.getInSchema())); join.setOutSchema(convertSchema(protoNode.getOutSchema())); if (joinProto.hasJoinQual()) { - join.setJoinQual(EvalNodeDeserializer.deserialize(context, joinProto.getJoinQual())); + join.setJoinQual(EvalNodeDeserializer.deserialize(context, evalContext, joinProto.getJoinQual())); } if (joinProto.getExistsTargets()) { - join.setTargets(convertTargets(context, joinProto.getTargetsList())); + join.setTargets(convertTargets(context, evalContext, joinProto.getTargetsList())); } return join; } - private static SelectionNode convertFilter(OverridableConf context, Map<Integer, LogicalNode> nodeMap, - PlanProto.LogicalNode protoNode) { + private static SelectionNode convertFilter(OverridableConf context, EvalContext evalContext, + Map<Integer, LogicalNode> nodeMap, PlanProto.LogicalNode protoNode) { PlanProto.FilterNode filterProto = protoNode.getFilter(); SelectionNode selection = new SelectionNode(protoNode.getNodeId()); selection.setInSchema(convertSchema(protoNode.getInSchema())); selection.setOutSchema(convertSchema(protoNode.getOutSchema())); selection.setChild(nodeMap.get(filterProto.getChildSeq())); - selection.setQual(EvalNodeDeserializer.deserialize(context, filterProto.getQual())); + selection.setQual(EvalNodeDeserializer.deserialize(context, evalContext, filterProto.getQual())); return selection; } @@ -393,14 +400,15 @@ public class LogicalNodeDeserializer { return union; } - private static ScanNode convertScan(OverridableConf context, PlanProto.LogicalNode protoNode) { + private static ScanNode convertScan(OverridableConf context, EvalContext evalContext, PlanProto.LogicalNode protoNode) { ScanNode scan = new ScanNode(protoNode.getNodeId()); - fillScanNode(context, protoNode, scan); + fillScanNode(context, evalContext, protoNode, scan); return scan; } - private static void fillScanNode(OverridableConf context, PlanProto.LogicalNode protoNode, ScanNode scan) { + private static void fillScanNode(OverridableConf context, EvalContext evalContext, PlanProto.LogicalNode protoNode, + ScanNode scan) { PlanProto.ScanNode scanProto = protoNode.getScan(); if (scanProto.hasAlias()) { scan.init(new TableDesc(scanProto.getTable()), scanProto.getAlias()); @@ -409,11 +417,11 @@ public class LogicalNodeDeserializer { } if (scanProto.getExistTargets()) { - scan.setTargets(convertTargets(context, scanProto.getTargetsList())); + scan.setTargets(convertTargets(context, evalContext, scanProto.getTargetsList())); } if (scanProto.hasQual()) { - scan.setQual(EvalNodeDeserializer.deserialize(context, scanProto.getQual())); + scan.setQual(EvalNodeDeserializer.deserialize(context, evalContext, scanProto.getQual())); } if(scanProto.hasBroadcast()){ @@ -423,9 +431,10 @@ public class LogicalNodeDeserializer { scan.setOutSchema(convertSchema(protoNode.getOutSchema())); } - private static PartitionedTableScanNode convertPartitionScan(OverridableConf context, PlanProto.LogicalNode protoNode) { + private static PartitionedTableScanNode convertPartitionScan(OverridableConf context, EvalContext evalContext, + PlanProto.LogicalNode protoNode) { PartitionedTableScanNode partitionedScan = new PartitionedTableScanNode(protoNode.getNodeId()); - fillScanNode(context, protoNode, partitionedScan); + fillScanNode(context, evalContext, protoNode, partitionedScan); PlanProto.PartitionScanSpec partitionScanProto = protoNode.getPartitionScan(); Path [] paths = new Path[partitionScanProto.getPathsCount()]; @@ -436,16 +445,16 @@ public class LogicalNodeDeserializer { return partitionedScan; } - private static TableSubQueryNode convertTableSubQuery(OverridableConf context, - Map<Integer, LogicalNode> nodeMap, - PlanProto.LogicalNode protoNode) { + private static TableSubQueryNode convertTableSubQuery(OverridableConf context, EvalContext evalContext, + Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { PlanProto.TableSubQueryNode proto = protoNode.getTableSubQuery(); TableSubQueryNode tableSubQuery = new TableSubQueryNode(protoNode.getNodeId()); tableSubQuery.init(proto.getTableName(), nodeMap.get(proto.getChildSeq())); tableSubQuery.setInSchema(convertSchema(protoNode.getInSchema())); if (proto.getTargetsCount() > 0) { - tableSubQuery.setTargets(convertTargets(context, proto.getTargetsList())); + tableSubQuery.setTargets(convertTargets(context, evalContext, proto.getTargetsList())); } return tableSubQuery; @@ -602,20 +611,21 @@ public class LogicalNodeDeserializer { return truncateTable; } - private static AggregationFunctionCallEval [] convertAggFuncCallEvals(OverridableConf context, + private static AggregationFunctionCallEval [] convertAggFuncCallEvals(OverridableConf context, EvalContext evalContext, List<PlanProto.EvalNodeTree> evalTrees) { AggregationFunctionCallEval [] aggFuncs = new AggregationFunctionCallEval[evalTrees.size()]; for (int i = 0; i < aggFuncs.length; i++) { - aggFuncs[i] = (AggregationFunctionCallEval) EvalNodeDeserializer.deserialize(context, evalTrees.get(i)); + aggFuncs[i] = (AggregationFunctionCallEval) EvalNodeDeserializer.deserialize(context, evalContext, + evalTrees.get(i)); } return aggFuncs; } - private static WindowFunctionEval[] convertWindowFunccEvals(OverridableConf context, - List<PlanProto.EvalNodeTree> evalTrees) { + private static WindowFunctionEval[] convertWindowFunccEvals(OverridableConf context, EvalContext evalContext, + List<PlanProto.EvalNodeTree> evalTrees) { WindowFunctionEval[] winFuncEvals = new WindowFunctionEval[evalTrees.size()]; for (int i = 0; i < winFuncEvals.length; i++) { - winFuncEvals[i] = (WindowFunctionEval) EvalNodeDeserializer.deserialize(context, evalTrees.get(i)); + winFuncEvals[i] = (WindowFunctionEval) EvalNodeDeserializer.deserialize(context, evalContext, evalTrees.get(i)); } return winFuncEvals; } @@ -632,11 +642,12 @@ public class LogicalNodeDeserializer { return columns; } - public static Target[] convertTargets(OverridableConf context, List<PlanProto.Target> targetsProto) { + public static Target[] convertTargets(OverridableConf context, EvalContext evalContext, + List<PlanProto.Target> targetsProto) { Target [] targets = new Target[targetsProto.size()]; for (int i = 0; i < targets.length; i++) { PlanProto.Target targetProto = targetsProto.get(i); - EvalNode evalNode = EvalNodeDeserializer.deserialize(context, targetProto.getExpr()); + EvalNode evalNode = EvalNodeDeserializer.deserialize(context, evalContext, targetProto.getExpr()); if (targetProto.hasAlias()) { targets[i] = new Target(evalNode, targetProto.getAlias()); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/proto/Plan.proto ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index c555a1a..de949d5 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -330,25 +330,26 @@ enum EvalType { WINDOW_FUNCTION = 18; AGG_FUNCTION = 19; FUNCTION = 20; + PYTHON_FUNCTION = 21; // String operator or pattern matching predicates - LIKE = 21; - SIMILAR_TO = 22; - REGEX = 23; - CONCATENATE = 24; + LIKE = 22; + SIMILAR_TO = 23; + REGEX = 24; + CONCATENATE = 25; // Other predicates - BETWEEN = 25; - CASE = 26; - IF_THEN = 27; - IN = 28; + BETWEEN = 26; + CASE = 27; + IF_THEN = 28; + IN = 29; // Value or Reference - SIGNED = 29; - CAST = 30; - ROW_CONSTANT = 31; - FIELD = 32; - CONST = 33; + SIGNED = 30; + CAST = 31; + ROW_CONSTANT = 32; + FIELD = 33; + CONST = 34; } message EvalNodeTree { @@ -476,6 +477,7 @@ message Datum { optional string text = 7; optional bytes blob = 8; optional Interval interval = 12; + optional Datum actual = 13; // for ANY type datum } message Interval { http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java index fccaf2a..9e7f334 100644 --- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java @@ -65,7 +65,7 @@ public class TestLazyTuple { sb.append(DatumFactory.createFloat4(77.9f)).append('|'); sb.append(DatumFactory.createFloat8(271.9f)).append('|'); sb.append(DatumFactory.createText("str2")).append('|'); - sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|'); + sb.append(DatumFactory.createBlob("jinho")).append('|'); sb.append(DatumFactory.createInet4("192.168.0.1")).append('|'); sb.append(new String(nullbytes)).append('|'); sb.append(NullDatum.get()); http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json index 739dfe7..d3aee33 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json @@ -1,6 +1,6 @@ -{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} -{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1" -{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} -{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1" -{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1" -{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} \ No newline at end of file +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", "col10": "192.168.0.1"} +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", "col10": "192.168.0.1" +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", "col10": "192.168.0.1"} +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", "col10": "192.168.0.1" +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", "col10": "192.168.0.1" +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", "col10": "192.168.0.1"} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json index 8ee3408..ec31982 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json @@ -1 +1 @@ -{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", "col10": "192.168.0.1"}
