Repository: tajo
Updated Branches:
  refs/heads/master 14a1e536c -> a74538539


http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
index 8a24add..43a8618 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
@@ -48,7 +48,7 @@ public class LogicalPlanEqualityTester implements 
LogicalPlanRewriteRule {
   public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) 
throws PlanningException {
     LogicalNode root = plan.getRootBlock().getRoot();
     PlanProto.LogicalNodeTree serialized = 
LogicalNodeSerializer.serialize(plan.getRootBlock().getRoot());
-    LogicalNode deserialized = 
LogicalNodeDeserializer.deserialize(queryContext, serialized);
+    LogicalNode deserialized = 
LogicalNodeDeserializer.deserialize(queryContext, null, serialized);
     assert root.deepEquals(deserialized);
     return plan;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
index c49d51b..d8b6380 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
@@ -86,7 +86,7 @@ public class PartitionedTableRewriter implements 
LogicalPlanRewriteRule {
     public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
       this.schema = schema;
       this.partitionFilter = partitionFilter;
-      partitionFilter.bind(schema);
+      partitionFilter.bind(null, schema);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java
index 5a96054..3ca76ee 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java
@@ -35,10 +35,11 @@ import org.apache.tajo.datum.*;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.function.AggFunction;
-import org.apache.tajo.plan.function.GeneralFunction;
+import org.apache.tajo.plan.function.python.PythonScriptEngine;
 import org.apache.tajo.plan.logical.WindowSpec;
 import org.apache.tajo.plan.serder.PlanProto.WinFunctionEvalSpec;
 
+import java.io.IOException;
 import java.util.*;
 
 /**
@@ -52,7 +53,7 @@ import java.util.*;
  */
 public class EvalNodeDeserializer {
 
-  public static EvalNode deserialize(OverridableConf context, 
PlanProto.EvalNodeTree tree) {
+  public static EvalNode deserialize(OverridableConf context, EvalContext 
evalContext, PlanProto.EvalNodeTree tree) {
     Map<Integer, EvalNode> evalNodeMap = Maps.newHashMap();
 
     // sort serialized eval nodes in an ascending order of their IDs.
@@ -180,9 +181,10 @@ public class EvalNodeDeserializer {
         try {
           funcDesc = new FunctionDesc(funcProto.getFuncion());
           if (type == EvalType.FUNCTION) {
-            GeneralFunction instance = (GeneralFunction) 
funcDesc.newInstance();
-            current = new GeneralFunctionEval(context, new 
FunctionDesc(funcProto.getFuncion()), instance, params);
-
+            current = new GeneralFunctionEval(context, funcDesc, params);
+            if (evalContext != null && funcDesc.getInvocation().hasPython()) {
+              evalContext.addScriptEngine(current, new 
PythonScriptEngine(funcDesc));
+            }
           } else if (type == EvalType.AGG_FUNCTION || type == 
EvalType.WINDOW_FUNCTION) {
             AggFunction instance = (AggFunction) funcDesc.newInstance();
             if (type == EvalType.AGG_FUNCTION) {
@@ -228,6 +230,8 @@ public class EvalNodeDeserializer {
           throw new NoSuchFunctionException(functionName, parameterTypes);
         } catch (InternalException ie) {
           throw new NoSuchFunctionException(funcDesc.getFunctionName(), 
funcDesc.getParamTypes());
+        } catch (IOException e) {
+          throw new NoSuchFunctionException(e.getMessage());
         }
       } else {
         throw new RuntimeException("Unknown EvalType: " + type.name());
@@ -309,6 +313,8 @@ public class EvalNodeDeserializer {
       return new IntervalDatum(datum.getInterval().getMonth(), 
datum.getInterval().getMsec());
     case NULL_TYPE:
       return NullDatum.get();
+    case ANY:
+      return DatumFactory.createAny(deserialize(datum.getActual()));
     default:
       throw new RuntimeException("Unknown data type: " + 
datum.getType().name());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java
index c7702c5..e47d620 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java
@@ -24,6 +24,7 @@ import com.google.protobuf.ByteString;
 import org.apache.tajo.algebra.WindowSpec.WindowFrameEndBoundType;
 import org.apache.tajo.algebra.WindowSpec.WindowFrameStartBoundType;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.datum.AnyDatum;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.plan.expr.*;
@@ -181,6 +182,7 @@ public class EvalNodeSerializer
     return rowConst;
   }
 
+  @Override
   public EvalNode visitField(EvalTreeProtoBuilderContext context, 
Stack<EvalNode> stack, FieldEval field) {
     PlanProto.EvalNode.Builder builder = createEvalBuilder(context, field);
     builder.setField(field.getColumnRef().getProto());
@@ -188,6 +190,7 @@ public class EvalNodeSerializer
     return field;
   }
 
+  @Override
   public EvalNode visitBetween(EvalTreeProtoBuilderContext context, 
BetweenPredicateEval between,
                                Stack<EvalNode> stack) {
     // visiting and registering childs
@@ -211,6 +214,7 @@ public class EvalNodeSerializer
     return between;
   }
 
+  @Override
   public EvalNode visitCaseWhen(EvalTreeProtoBuilderContext context, 
CaseWhenEval caseWhen, Stack<EvalNode> stack) {
     // visiting and registering childs
     super.visitCaseWhen(context, caseWhen, stack);
@@ -235,6 +239,7 @@ public class EvalNodeSerializer
     return caseWhen;
   }
 
+  @Override
   public EvalNode visitIfThen(EvalTreeProtoBuilderContext context, 
CaseWhenEval.IfThenEval ifCond,
                               Stack<EvalNode> stack) {
     // visiting and registering childs
@@ -254,6 +259,7 @@ public class EvalNodeSerializer
     return ifCond;
   }
 
+  @Override
   public EvalNode visitFuncCall(EvalTreeProtoBuilderContext context, 
FunctionEval function, Stack<EvalNode> stack) {
     // visiting and registering childs
     super.visitFuncCall(context, function, stack);
@@ -297,7 +303,6 @@ public class EvalNodeSerializer
       builder.setWinFunction(windowFuncBuilder);
     }
 
-
     context.treeBuilder.addNodes(builder);
     return function;
   }
@@ -388,6 +393,9 @@ public class EvalNodeSerializer
       intervalBuilder.setMsec(interval.getMilliSeconds());
       builder.setInterval(intervalBuilder);
       break;
+    case ANY:
+      builder.setActual(serialize(((AnyDatum)datum).getActual()));
+      break;
     default:
       throw new RuntimeException("Unknown data type: " + datum.type().name());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
index f96626b..84991bb 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.OverridableConf;
 import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
@@ -31,10 +32,7 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.plan.Target;
-import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
-import org.apache.tajo.plan.expr.EvalNode;
-import org.apache.tajo.plan.expr.FieldEval;
-import org.apache.tajo.plan.expr.WindowFunctionEval;
+import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
@@ -58,7 +56,8 @@ public class LogicalNodeDeserializer {
    * @param tree LogicalNodeTree which contains a list of serialized logical 
nodes.
    * @return A logical node tree
    */
-  public static LogicalNode deserialize(OverridableConf context, 
PlanProto.LogicalNodeTree tree) {
+  public static LogicalNode deserialize(OverridableConf context, @Nullable 
EvalContext evalContext,
+                                        PlanProto.LogicalNodeTree tree) {
     Map<Integer, LogicalNode> nodeMap = Maps.newHashMap();
 
     // sort serialized logical nodes in an ascending order of their sids
@@ -88,10 +87,10 @@ public class LogicalNodeDeserializer {
         current = convertSetSession(protoNode);
         break;
       case EXPRS:
-        current = convertEvalExpr(context, protoNode);
+        current = convertEvalExpr(context, evalContext, protoNode);
         break;
       case PROJECTION:
-        current = convertProjection(context, nodeMap, protoNode);
+        current = convertProjection(context, evalContext, nodeMap, protoNode);
         break;
       case LIMIT:
         current = convertLimit(nodeMap, protoNode);
@@ -100,34 +99,34 @@ public class LogicalNodeDeserializer {
         current = convertSort(nodeMap, protoNode);
         break;
       case WINDOW_AGG:
-        current = convertWindowAgg(context, nodeMap, protoNode);
+        current = convertWindowAgg(context, evalContext, nodeMap, protoNode);
         break;
       case HAVING:
-        current = convertHaving(context, nodeMap, protoNode);
+        current = convertHaving(context, evalContext, nodeMap, protoNode);
         break;
       case GROUP_BY:
-        current = convertGroupby(context, nodeMap, protoNode);
+        current = convertGroupby(context, evalContext, nodeMap, protoNode);
         break;
       case DISTINCT_GROUP_BY:
-        current = convertDistinctGroupby(context, nodeMap, protoNode);
+        current = convertDistinctGroupby(context, evalContext, nodeMap, 
protoNode);
         break;
       case SELECTION:
-        current = convertFilter(context, nodeMap, protoNode);
+        current = convertFilter(context, evalContext, nodeMap, protoNode);
         break;
       case JOIN:
-        current = convertJoin(context, nodeMap, protoNode);
+        current = convertJoin(context, evalContext, nodeMap, protoNode);
         break;
       case TABLE_SUBQUERY:
-        current = convertTableSubQuery(context, nodeMap, protoNode);
+        current = convertTableSubQuery(context, evalContext, nodeMap, 
protoNode);
         break;
       case UNION:
         current = convertUnion(nodeMap, protoNode);
         break;
       case PARTITIONS_SCAN:
-        current = convertPartitionScan(context, protoNode);
+        current = convertPartitionScan(context, evalContext, protoNode);
         break;
       case SCAN:
-        current = convertScan(context, protoNode);
+        current = convertScan(context, evalContext, protoNode);
         break;
 
       case CREATE_TABLE:
@@ -192,22 +191,25 @@ public class LogicalNodeDeserializer {
     return setSession;
   }
 
-  private static EvalExprNode convertEvalExpr(OverridableConf context, 
PlanProto.LogicalNode protoNode) {
+  private static EvalExprNode convertEvalExpr(OverridableConf context, 
EvalContext evalContext,
+                                              PlanProto.LogicalNode protoNode) 
{
     PlanProto.EvalExprNode evalExprProto = protoNode.getExprEval();
 
     EvalExprNode evalExpr = new EvalExprNode(protoNode.getNodeId());
     evalExpr.setInSchema(convertSchema(protoNode.getInSchema()));
-    evalExpr.setTargets(convertTargets(context, 
evalExprProto.getTargetsList()));
+    evalExpr.setTargets(convertTargets(context, evalContext, 
evalExprProto.getTargetsList()));
 
     return evalExpr;
   }
 
-  private static ProjectionNode convertProjection(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
-                                                 PlanProto.LogicalNode 
protoNode) {
+  private static ProjectionNode convertProjection(OverridableConf context, 
EvalContext evalContext,
+                                                  Map<Integer, LogicalNode> 
nodeMap,
+                                                  PlanProto.LogicalNode 
protoNode) {
     PlanProto.ProjectionNode projectionProto = protoNode.getProjection();
 
     ProjectionNode projectionNode = new ProjectionNode(protoNode.getNodeId());
-    projectionNode.init(projectionProto.getDistinct(), convertTargets(context, 
projectionProto.getTargetsList()));
+    projectionNode.init(projectionProto.getDistinct(), convertTargets(context, 
evalContext,
+        projectionProto.getTargetsList()));
     projectionNode.setChild(nodeMap.get(projectionProto.getChildSeq()));
     projectionNode.setInSchema(convertSchema(protoNode.getInSchema()));
     projectionNode.setOutSchema(convertSchema(protoNode.getOutSchema()));
@@ -239,21 +241,22 @@ public class LogicalNodeDeserializer {
     return sortNode;
   }
 
-  private static HavingNode convertHaving(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
-                                         PlanProto.LogicalNode protoNode) {
+  private static HavingNode convertHaving(OverridableConf context, EvalContext 
evalContext,
+                                          Map<Integer, LogicalNode> nodeMap, 
PlanProto.LogicalNode protoNode) {
     PlanProto.FilterNode havingProto = protoNode.getFilter();
 
     HavingNode having = new HavingNode(protoNode.getNodeId());
     having.setChild(nodeMap.get(havingProto.getChildSeq()));
-    having.setQual(EvalNodeDeserializer.deserialize(context, 
havingProto.getQual()));
+    having.setQual(EvalNodeDeserializer.deserialize(context, evalContext, 
havingProto.getQual()));
     having.setInSchema(convertSchema(protoNode.getInSchema()));
     having.setOutSchema(convertSchema(protoNode.getOutSchema()));
 
     return having;
   }
 
-  private static WindowAggNode convertWindowAgg(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
-                                               PlanProto.LogicalNode 
protoNode) {
+  private static WindowAggNode convertWindowAgg(OverridableConf context, 
EvalContext evalContext,
+                                                Map<Integer, LogicalNode> 
nodeMap,
+                                                PlanProto.LogicalNode 
protoNode) {
     PlanProto.WindowAggNode windowAggProto = protoNode.getWindowAgg();
 
     WindowAggNode windowAgg = new WindowAggNode(protoNode.getNodeId());
@@ -264,7 +267,8 @@ public class LogicalNodeDeserializer {
     }
 
     if (windowAggProto.getWindowFunctionsCount() > 0) {
-      windowAgg.setWindowFunctions(convertWindowFunccEvals(context, 
windowAggProto.getWindowFunctionsList()));
+      windowAgg.setWindowFunctions(convertWindowFunccEvals(context, 
evalContext,
+          windowAggProto.getWindowFunctionsList()));
     }
 
     windowAgg.setDistinct(windowAggProto.getDistinct());
@@ -274,7 +278,7 @@ public class LogicalNodeDeserializer {
     }
 
     if (windowAggProto.getTargetsCount() > 0) {
-      windowAgg.setTargets(convertTargets(context, 
windowAggProto.getTargetsList()));
+      windowAgg.setTargets(convertTargets(context, evalContext, 
windowAggProto.getTargetsList()));
     }
 
     windowAgg.setInSchema(convertSchema(protoNode.getInSchema()));
@@ -283,8 +287,8 @@ public class LogicalNodeDeserializer {
     return windowAgg;
   }
 
-  private static GroupbyNode convertGroupby(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
-                                           PlanProto.LogicalNode protoNode) {
+  private static GroupbyNode convertGroupby(OverridableConf context, 
EvalContext evalContext,
+                                            Map<Integer, LogicalNode> nodeMap, 
PlanProto.LogicalNode protoNode) {
     PlanProto.GroupbyNode groupbyProto = protoNode.getGroupby();
 
     GroupbyNode groupby = new GroupbyNode(protoNode.getNodeId());
@@ -295,10 +299,10 @@ public class LogicalNodeDeserializer {
       
groupby.setGroupingColumns(convertColumns(groupbyProto.getGroupingKeysList()));
     }
     if (groupbyProto.getAggFunctionsCount() > 0) {
-      groupby.setAggFunctions(convertAggFuncCallEvals(context, 
groupbyProto.getAggFunctionsList()));
+      groupby.setAggFunctions(convertAggFuncCallEvals(context, evalContext, 
groupbyProto.getAggFunctionsList()));
     }
     if (groupbyProto.getTargetsCount() > 0) {
-      groupby.setTargets(convertTargets(context, 
groupbyProto.getTargetsList()));
+      groupby.setTargets(convertTargets(context, evalContext, 
groupbyProto.getTargetsList()));
     }
 
     groupby.setInSchema(convertSchema(protoNode.getInSchema()));
@@ -307,21 +311,23 @@ public class LogicalNodeDeserializer {
     return groupby;
   }
 
-  private static DistinctGroupbyNode convertDistinctGroupby(OverridableConf 
context, Map<Integer, LogicalNode> nodeMap,
-                                           PlanProto.LogicalNode protoNode) {
+  private static DistinctGroupbyNode convertDistinctGroupby(OverridableConf 
context, EvalContext evalContext,
+                                                            Map<Integer, 
LogicalNode> nodeMap,
+                                                            
PlanProto.LogicalNode protoNode) {
     PlanProto.DistinctGroupbyNode distinctGroupbyProto = 
protoNode.getDistinctGroupby();
 
     DistinctGroupbyNode distinctGroupby = new 
DistinctGroupbyNode(protoNode.getNodeId());
     distinctGroupby.setChild(nodeMap.get(distinctGroupbyProto.getChildSeq()));
 
     if (distinctGroupbyProto.hasGroupbyNode()) {
-      distinctGroupby.setGroupbyPlan(convertGroupby(context, nodeMap, 
distinctGroupbyProto.getGroupbyNode()));
+      distinctGroupby.setGroupbyPlan(convertGroupby(context, evalContext, 
nodeMap,
+          distinctGroupbyProto.getGroupbyNode()));
     }
 
     if (distinctGroupbyProto.getSubPlansCount() > 0) {
       List<GroupbyNode> subPlans = TUtil.newList();
       for (int i = 0; i < distinctGroupbyProto.getSubPlansCount(); i++) {
-        subPlans.add(convertGroupby(context, nodeMap, 
distinctGroupbyProto.getSubPlans(i)));
+        subPlans.add(convertGroupby(context, evalContext, nodeMap, 
distinctGroupbyProto.getSubPlans(i)));
       }
       distinctGroupby.setSubPlans(subPlans);
     }
@@ -330,10 +336,11 @@ public class LogicalNodeDeserializer {
       
distinctGroupby.setGroupingColumns(convertColumns(distinctGroupbyProto.getGroupingKeysList()));
     }
     if (distinctGroupbyProto.getAggFunctionsCount() > 0) {
-      distinctGroupby.setAggFunctions(convertAggFuncCallEvals(context, 
distinctGroupbyProto.getAggFunctionsList()));
+      distinctGroupby.setAggFunctions(convertAggFuncCallEvals(context, 
evalContext,
+          distinctGroupbyProto.getAggFunctionsList()));
     }
     if (distinctGroupbyProto.getTargetsCount() > 0) {
-      distinctGroupby.setTargets(convertTargets(context, 
distinctGroupbyProto.getTargetsList()));
+      distinctGroupby.setTargets(convertTargets(context, evalContext, 
distinctGroupbyProto.getTargetsList()));
     }
     int [] resultColumnIds = new int[distinctGroupbyProto.getResultIdCount()];
     for (int i = 0; i < distinctGroupbyProto.getResultIdCount(); i++) {
@@ -348,8 +355,8 @@ public class LogicalNodeDeserializer {
     return distinctGroupby;
   }
 
-  private static JoinNode convertJoin(OverridableConf context, Map<Integer, 
LogicalNode> nodeMap,
-                                     PlanProto.LogicalNode protoNode) {
+  private static JoinNode convertJoin(OverridableConf context, EvalContext 
evalContext,
+                                      Map<Integer, LogicalNode> nodeMap, 
PlanProto.LogicalNode protoNode) {
     PlanProto.JoinNode joinProto = protoNode.getJoin();
 
     JoinNode join = new JoinNode(protoNode.getNodeId());
@@ -359,24 +366,24 @@ public class LogicalNodeDeserializer {
     join.setInSchema(convertSchema(protoNode.getInSchema()));
     join.setOutSchema(convertSchema(protoNode.getOutSchema()));
     if (joinProto.hasJoinQual()) {
-      join.setJoinQual(EvalNodeDeserializer.deserialize(context, 
joinProto.getJoinQual()));
+      join.setJoinQual(EvalNodeDeserializer.deserialize(context, evalContext, 
joinProto.getJoinQual()));
     }
     if (joinProto.getExistsTargets()) {
-      join.setTargets(convertTargets(context, joinProto.getTargetsList()));
+      join.setTargets(convertTargets(context, evalContext, 
joinProto.getTargetsList()));
     }
 
     return join;
   }
 
-  private static SelectionNode convertFilter(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
-                                            PlanProto.LogicalNode protoNode) {
+  private static SelectionNode convertFilter(OverridableConf context, 
EvalContext evalContext,
+                                             Map<Integer, LogicalNode> 
nodeMap, PlanProto.LogicalNode protoNode) {
     PlanProto.FilterNode filterProto = protoNode.getFilter();
 
     SelectionNode selection = new SelectionNode(protoNode.getNodeId());
     selection.setInSchema(convertSchema(protoNode.getInSchema()));
     selection.setOutSchema(convertSchema(protoNode.getOutSchema()));
     selection.setChild(nodeMap.get(filterProto.getChildSeq()));
-    selection.setQual(EvalNodeDeserializer.deserialize(context, 
filterProto.getQual()));
+    selection.setQual(EvalNodeDeserializer.deserialize(context, evalContext, 
filterProto.getQual()));
 
     return selection;
   }
@@ -393,14 +400,15 @@ public class LogicalNodeDeserializer {
     return union;
   }
 
-  private static ScanNode convertScan(OverridableConf context, 
PlanProto.LogicalNode protoNode) {
+  private static ScanNode convertScan(OverridableConf context, EvalContext 
evalContext, PlanProto.LogicalNode protoNode) {
     ScanNode scan = new ScanNode(protoNode.getNodeId());
-    fillScanNode(context, protoNode, scan);
+    fillScanNode(context, evalContext, protoNode, scan);
 
     return scan;
   }
 
-  private static void fillScanNode(OverridableConf context, 
PlanProto.LogicalNode protoNode, ScanNode scan) {
+  private static void fillScanNode(OverridableConf context, EvalContext 
evalContext, PlanProto.LogicalNode protoNode,
+                                   ScanNode scan) {
     PlanProto.ScanNode scanProto = protoNode.getScan();
     if (scanProto.hasAlias()) {
       scan.init(new TableDesc(scanProto.getTable()), scanProto.getAlias());
@@ -409,11 +417,11 @@ public class LogicalNodeDeserializer {
     }
 
     if (scanProto.getExistTargets()) {
-      scan.setTargets(convertTargets(context, scanProto.getTargetsList()));
+      scan.setTargets(convertTargets(context, evalContext, 
scanProto.getTargetsList()));
     }
 
     if (scanProto.hasQual()) {
-      scan.setQual(EvalNodeDeserializer.deserialize(context, 
scanProto.getQual()));
+      scan.setQual(EvalNodeDeserializer.deserialize(context, evalContext, 
scanProto.getQual()));
     }
 
     if(scanProto.hasBroadcast()){
@@ -423,9 +431,10 @@ public class LogicalNodeDeserializer {
     scan.setOutSchema(convertSchema(protoNode.getOutSchema()));
   }
 
-  private static PartitionedTableScanNode convertPartitionScan(OverridableConf 
context, PlanProto.LogicalNode protoNode) {
+  private static PartitionedTableScanNode convertPartitionScan(OverridableConf 
context, EvalContext evalContext,
+                                                               
PlanProto.LogicalNode protoNode) {
     PartitionedTableScanNode partitionedScan = new 
PartitionedTableScanNode(protoNode.getNodeId());
-    fillScanNode(context, protoNode, partitionedScan);
+    fillScanNode(context, evalContext, protoNode, partitionedScan);
 
     PlanProto.PartitionScanSpec partitionScanProto = 
protoNode.getPartitionScan();
     Path [] paths = new Path[partitionScanProto.getPathsCount()];
@@ -436,16 +445,16 @@ public class LogicalNodeDeserializer {
     return partitionedScan;
   }
 
-  private static TableSubQueryNode convertTableSubQuery(OverridableConf 
context,
-                                                                 Map<Integer, 
LogicalNode> nodeMap,
-                                                                 
PlanProto.LogicalNode protoNode) {
+  private static TableSubQueryNode convertTableSubQuery(OverridableConf 
context, EvalContext evalContext,
+                                                        Map<Integer, 
LogicalNode> nodeMap,
+                                                        PlanProto.LogicalNode 
protoNode) {
     PlanProto.TableSubQueryNode proto = protoNode.getTableSubQuery();
 
     TableSubQueryNode tableSubQuery = new 
TableSubQueryNode(protoNode.getNodeId());
     tableSubQuery.init(proto.getTableName(), nodeMap.get(proto.getChildSeq()));
     tableSubQuery.setInSchema(convertSchema(protoNode.getInSchema()));
     if (proto.getTargetsCount() > 0) {
-      tableSubQuery.setTargets(convertTargets(context, 
proto.getTargetsList()));
+      tableSubQuery.setTargets(convertTargets(context, evalContext, 
proto.getTargetsList()));
     }
 
     return tableSubQuery;
@@ -602,20 +611,21 @@ public class LogicalNodeDeserializer {
     return truncateTable;
   }
 
-  private static AggregationFunctionCallEval [] 
convertAggFuncCallEvals(OverridableConf context,
+  private static AggregationFunctionCallEval [] 
convertAggFuncCallEvals(OverridableConf context, EvalContext evalContext,
                                                                        
List<PlanProto.EvalNodeTree> evalTrees) {
     AggregationFunctionCallEval [] aggFuncs = new 
AggregationFunctionCallEval[evalTrees.size()];
     for (int i = 0; i < aggFuncs.length; i++) {
-      aggFuncs[i] = (AggregationFunctionCallEval) 
EvalNodeDeserializer.deserialize(context, evalTrees.get(i));
+      aggFuncs[i] = (AggregationFunctionCallEval) 
EvalNodeDeserializer.deserialize(context, evalContext,
+          evalTrees.get(i));
     }
     return aggFuncs;
   }
 
-  private static WindowFunctionEval[] convertWindowFunccEvals(OverridableConf 
context,
-                                                                       
List<PlanProto.EvalNodeTree> evalTrees) {
+  private static WindowFunctionEval[] convertWindowFunccEvals(OverridableConf 
context, EvalContext evalContext,
+                                                              
List<PlanProto.EvalNodeTree> evalTrees) {
     WindowFunctionEval[] winFuncEvals = new 
WindowFunctionEval[evalTrees.size()];
     for (int i = 0; i < winFuncEvals.length; i++) {
-      winFuncEvals[i] = (WindowFunctionEval) 
EvalNodeDeserializer.deserialize(context, evalTrees.get(i));
+      winFuncEvals[i] = (WindowFunctionEval) 
EvalNodeDeserializer.deserialize(context, evalContext, evalTrees.get(i));
     }
     return winFuncEvals;
   }
@@ -632,11 +642,12 @@ public class LogicalNodeDeserializer {
     return columns;
   }
 
-  public static Target[] convertTargets(OverridableConf context, 
List<PlanProto.Target> targetsProto) {
+  public static Target[] convertTargets(OverridableConf context, EvalContext 
evalContext,
+                                        List<PlanProto.Target> targetsProto) {
     Target [] targets = new Target[targetsProto.size()];
     for (int i = 0; i < targets.length; i++) {
       PlanProto.Target targetProto = targetsProto.get(i);
-      EvalNode evalNode = EvalNodeDeserializer.deserialize(context, 
targetProto.getExpr());
+      EvalNode evalNode = EvalNodeDeserializer.deserialize(context, 
evalContext, targetProto.getExpr());
       if (targetProto.hasAlias()) {
         targets[i] = new Target(evalNode, targetProto.getAlias());
       } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto 
b/tajo-plan/src/main/proto/Plan.proto
index c555a1a..de949d5 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -330,25 +330,26 @@ enum EvalType {
   WINDOW_FUNCTION = 18;
   AGG_FUNCTION = 19;
   FUNCTION = 20;
+  PYTHON_FUNCTION = 21;
 
   // String operator or pattern matching predicates
-  LIKE = 21;
-  SIMILAR_TO = 22;
-  REGEX = 23;
-  CONCATENATE = 24;
+  LIKE = 22;
+  SIMILAR_TO = 23;
+  REGEX = 24;
+  CONCATENATE = 25;
 
   // Other predicates
-  BETWEEN = 25;
-  CASE = 26;
-  IF_THEN = 27;
-  IN = 28;
+  BETWEEN = 26;
+  CASE = 27;
+  IF_THEN = 28;
+  IN = 29;
 
   // Value or Reference
-  SIGNED = 29;
-  CAST = 30;
-  ROW_CONSTANT = 31;
-  FIELD = 32;
-  CONST = 33;
+  SIGNED = 30;
+  CAST = 31;
+  ROW_CONSTANT = 32;
+  FIELD = 33;
+  CONST = 34;
 }
 
 message EvalNodeTree {
@@ -476,6 +477,7 @@ message Datum {
   optional string text = 7;
   optional bytes blob = 8;
   optional Interval interval = 12;
+  optional Datum actual = 13; // for ANY type datum
 }
 
 message Interval {

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
 
b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
index fccaf2a..9e7f334 100644
--- 
a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
+++ 
b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
@@ -65,7 +65,7 @@ public class TestLazyTuple {
     sb.append(DatumFactory.createFloat4(77.9f)).append('|');
     sb.append(DatumFactory.createFloat8(271.9f)).append('|');
     sb.append(DatumFactory.createText("str2")).append('|');
-    sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|');
+    sb.append(DatumFactory.createBlob("jinho")).append('|');
     sb.append(DatumFactory.createInet4("192.168.0.1")).append('|');
     sb.append(new String(nullbytes)).append('|');
     sb.append(NullDatum.get());

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
 
b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
index 739dfe7..d3aee33 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
@@ -1,6 +1,6 @@
-{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"}
-{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"
-{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"}
-{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"
-{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"
-{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"}
\ No newline at end of file
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", 
"col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", 
"col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", 
"col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", 
"col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", 
"col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", 
"col10": "192.168.0.1"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
 
b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
index 8ee3408..ec31982 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
@@ -1 +1 @@
-{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", 
"col10": "192.168.0.1"}

Reply via email to