http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/resources/python/controller.py
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/python/controller.py 
b/tajo-core/src/main/resources/python/controller.py
new file mode 100644
index 0000000..d969b34
--- /dev/null
+++ b/tajo-core/src/main/resources/python/controller.py
@@ -0,0 +1,330 @@
+#! /usr/bin/env python
+############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import sys
+import os
+import logging
+import base64
+
+from datetime import datetime
+try:
+    from dateutil import parser
+    USE_DATEUTIL = True
+except ImportError:
+    USE_DATEUTIL = False
+
+from tajo_util import write_user_exception, udf_logging
+
+FIELD_DELIMITER = ','
+TUPLE_START = '('
+TUPLE_END = ')'
+BAG_START = '{'
+BAG_END = '}'
+MAP_START = '['
+MAP_END = ']'
+MAP_KEY = '#'
+PARAMETER_DELIMITER = '\t'
+PRE_WRAP_DELIM = '|'
+POST_WRAP_DELIM = '_'
+NULL_BYTE = "-"
+END_RECORD_DELIM = '|_\n'
+END_RECORD_DELIM_LENGTH = len(END_RECORD_DELIM)
+
+WRAPPED_FIELD_DELIMITER = PRE_WRAP_DELIM + FIELD_DELIMITER + POST_WRAP_DELIM
+WRAPPED_TUPLE_START = PRE_WRAP_DELIM + TUPLE_START + POST_WRAP_DELIM
+WRAPPED_TUPLE_END = PRE_WRAP_DELIM + TUPLE_END + POST_WRAP_DELIM
+WRAPPED_BAG_START = PRE_WRAP_DELIM + BAG_START + POST_WRAP_DELIM
+WRAPPED_BAG_END = PRE_WRAP_DELIM + BAG_END + POST_WRAP_DELIM
+WRAPPED_MAP_START = PRE_WRAP_DELIM + MAP_START + POST_WRAP_DELIM
+WRAPPED_MAP_END = PRE_WRAP_DELIM + MAP_END + POST_WRAP_DELIM
+WRAPPED_PARAMETER_DELIMITER = PRE_WRAP_DELIM + PARAMETER_DELIMITER + 
POST_WRAP_DELIM
+WRAPPED_NULL_BYTE = PRE_WRAP_DELIM + NULL_BYTE + POST_WRAP_DELIM
+
+TYPE_TUPLE = TUPLE_START
+TYPE_BAG = BAG_START
+TYPE_MAP = MAP_START
+
+TYPE_BOOLEAN = "B"
+TYPE_INTEGER = "I"
+TYPE_LONG = "L"
+TYPE_FLOAT = "F"
+TYPE_DOUBLE = "D"
+TYPE_BYTEARRAY = "A"
+TYPE_CHARARRAY = "C"
+TYPE_DATETIME = "T"
+TYPE_BIGINTEGER = "N"
+TYPE_BIGDECIMAL = "E"
+
+END_OF_STREAM = TYPE_CHARARRAY + "\x04" + END_RECORD_DELIM
+TURN_ON_OUTPUT_CAPTURING = TYPE_CHARARRAY + "TURN_ON_OUTPUT_CAPTURING" + 
END_RECORD_DELIM
+NUM_LINES_OFFSET_TRACE = int(os.environ.get('PYTHON_TRACE_OFFSET', 0))
+
+class PythonStreamingController:
+    def __init__(self, profiling_mode=False):
+        self.profiling_mode = profiling_mode
+
+    def main(self,
+             module_name, file_path, func_name, cache_path,
+             output_stream_path, error_stream_path, log_file_name, 
output_schema):
+        sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0)
+
+        # Need to ensure that user functions can't write to the streams we use 
to communicate with pig.
+        self.stream_output = os.fdopen(sys.stdout.fileno(), 'wb', 0)
+        self.stream_error = os.fdopen(sys.stderr.fileno(), 'wb', 0)
+
+        self.input_stream = sys.stdin
+        # TODO: support controller logging
+        # self.log_stream = open(output_stream_path, 'a')
+        # sys.stderr = open(error_stream_path, 'w')
+
+        sys.path.append(file_path)
+        sys.path.append(cache_path)
+        sys.path.append('.')
+
+        should_log = False
+        if should_log:
+            logging.basicConfig(filename=log_file_name, format="%(asctime)s 
%(levelname)s %(message)s", level=udf_logging.udf_log_level)
+            logging.info("To reduce the amount of information being logged 
only a small subset of rows are logged at the "
+                         "INFO level.  Call udf_logging.set_log_level_debug in 
tajo_util to see all rows being processed.")
+
+        input_str = self.get_next_input()
+
+        try:
+            func = __import__(module_name, globals(), locals(), [func_name], 
-1).__dict__[func_name]
+        except:
+            # These errors should always be caused by user code.
+            write_user_exception(module_name, self.stream_error, 
NUM_LINES_OFFSET_TRACE)
+            self.close_controller(-1)
+
+        log_message = logging.info
+        if udf_logging.udf_log_level == logging.DEBUG:
+            log_message = logging.debug
+
+        while input_str != END_OF_STREAM:
+            try:
+                try:
+                    if should_log:
+                        log_message("Serialized Input: %s" % (input_str))
+                    inputs = deserialize_input(input_str)
+                    if should_log:
+                        log_message("Deserialized Input: %s" % 
(unicode(inputs)))
+                except:
+                    # Capture errors where the user passes in bad data.
+                    write_user_exception(module_name, self.stream_error, 
NUM_LINES_OFFSET_TRACE)
+                    self.close_controller(-3)
+
+                try:
+                    func_output = func(*inputs)
+                    if should_log:
+                        log_message("UDF Output: %s" % (unicode(func_output)))
+                except:
+                    # These errors should always be caused by user code.
+                    write_user_exception(module_name, self.stream_error, 
NUM_LINES_OFFSET_TRACE)
+                    self.close_controller(-2)
+
+                output = serialize_output(func_output, output_schema)
+                if should_log:
+                    log_message("Serialized Output: %s" % (output))
+
+                self.stream_output.write( "%s%s" % (output, END_RECORD_DELIM) )
+            except Exception as e:
+                # This should only catch internal exceptions with the 
controller
+                # and pig- not with user code.
+                import traceback
+                traceback.print_exc(file=self.stream_error)
+                sys.exit(-3)
+
+            sys.stdout.flush()
+            sys.stderr.flush()
+            self.stream_output.flush()
+            self.stream_error.flush()
+
+            input_str = self.get_next_input()
+
+    def get_next_input(self):
+        input_stream = self.input_stream
+        # log_stream = self.log_stream
+
+        input_str = input_stream.readline()
+
+        while input_str.endswith(END_RECORD_DELIM) == False:
+            line = input_stream.readline()
+            if line == '':
+                input_str = ''
+                break
+            input_str += line
+
+        if input_str == '':
+            return END_OF_STREAM
+
+        if input_str == END_OF_STREAM:
+            return input_str
+
+        return input_str[:-END_RECORD_DELIM_LENGTH]
+
+    def close_controller(self, exit_code):
+        sys.stderr.close()
+        self.stream_error.write("\n")
+        self.stream_error.close()
+        sys.stdout.close()
+        self.stream_output.write("\n")
+        self.stream_output.close()
+        sys.exit(exit_code)
+
+def deserialize_input(input_str):
+    if len(input_str) == 0:
+        return []
+
+    return [_deserialize_input(param, 0, len(param)) for param in 
input_str.split(WRAPPED_FIELD_DELIMITER)]
+
+def _deserialize_input(input_str, si, ei):
+    len = ei - si + 1
+    if len < 1:
+        # Handle all of the cases where you can have valid empty input.
+        if ei == si:
+            if input_str[si] == TYPE_CHARARRAY:
+                return u""
+            elif input_str[si] == TYPE_BYTEARRAY:
+                return bytearray("")
+            else:
+                raise Exception("Got input type flag %s, but no data to go 
with it.\nInput string: %s\nSlice: %s" % (input_str[si], input_str, 
input_str[si:ei+1]))
+        else:
+            raise Exception("Start index %d greater than end index %d.\nInput 
string: %s\n, Slice: %s" % (si, ei, input_str[si:ei+1]))
+
+    tokens = input_str.split(WRAPPED_PARAMETER_DELIMITER)
+    schema = tokens[0];
+    param = tokens[1];
+
+    if schema == NULL_BYTE:
+        return None
+    elif schema == TYPE_CHARARRAY:
+        return unicode(param, 'utf-8')
+    elif schema == TYPE_BYTEARRAY:
+        return bytearray(param)
+    elif schema == TYPE_INTEGER:
+        return int(param)
+    elif schema == TYPE_LONG or schema == TYPE_BIGINTEGER:
+        return long(param)
+    elif schema == TYPE_FLOAT or schema == TYPE_DOUBLE or schema == 
TYPE_BIGDECIMAL:
+        return float(param)
+    elif schema == TYPE_BOOLEAN:
+        return param == "true"
+    elif schema == TYPE_DATETIME:
+        # Format is "yyyy-MM-ddTHH:mm:ss.SSS+00:00" or 
"2013-08-23T18:14:03.123+ZZ"
+        if USE_DATEUTIL:
+            return parser.parse(param)
+        else:
+            # Try to use datetime even though it doesn't handle time zones 
properly,
+            # We only use the first 3 microsecond digits and drop time zone 
(first 23 characters)
+            return datetime.strptime(param, "%Y-%m-%dT%H:%M:%S.%f")
+    else:
+        raise Exception("Can't determine type of input: %s" % param)
+
+def _deserialize_collection(input_str, return_type, si, ei):
+    list_result = []
+    append_to_list_result = list_result.append
+    dict_result = {}
+
+    index = si
+    field_start = si
+    depth = 0
+
+    key = None
+
+    # recurse to deserialize elements if the collection is not empty
+    if ei-si+1 > 0:
+        while True:
+            if index >= ei - 2:
+                if return_type == TYPE_MAP:
+                    dict_result[key] = _deserialize_input(input_str, 
value_start, ei)
+                else:
+                    append_to_list_result(_deserialize_input(input_str, 
field_start, ei))
+                break
+
+            if return_type == TYPE_MAP and not key:
+                key_index = input_str.find(MAP_KEY, index)
+                key = unicode(input_str[index+1:key_index], 'utf-8')
+                index = key_index + 1
+                value_start = key_index + 1
+                continue
+
+            if not (input_str[index] == PRE_WRAP_DELIM and input_str[index+2] 
== POST_WRAP_DELIM):
+                prewrap_index = input_str.find(PRE_WRAP_DELIM, index+1)
+                index = (prewrap_index if prewrap_index != -1 else end_index)
+                continue
+
+            mid = input_str[index+1]
+
+            if mid == BAG_START or mid == TUPLE_START or mid == MAP_START:
+                depth += 1
+            elif mid == BAG_END or mid == TUPLE_END or mid == MAP_END:
+                depth -= 1
+            elif depth == 0 and mid == FIELD_DELIMITER:
+                if return_type == TYPE_MAP:
+                    dict_result[key] = _deserialize_input(input_str, 
value_start, index - 1)
+                    key = None
+                else:
+                    append_to_list_result(_deserialize_input(input_str, 
field_start, index - 1))
+                field_start = index + 3
+
+            index += 3
+
+    if return_type == TYPE_MAP:
+        return dict_result
+    elif return_type == TYPE_TUPLE:
+        return tuple(list_result)
+    else:
+        return list_result
+
+def wrap_tuple(o, serialized_item):
+    if type(o) != tuple:
+        return WRAPPED_TUPLE_START + serialized_item + WRAPPED_TUPLE_END
+    else:
+        return serialized_item
+
+def serialize_output(output, out_schema, utfEncodeAllFields=False):
+    """
+    @param utfEncodeStrings - Generally we want to utf encode only strings.  
But for
+        Maps we utf encode everything because on the Java side we don't know 
the schema
+        for maps so we wouldn't be able to tell which fields were encoded or 
not.
+    """
+
+    output_type = type(output)
+
+    if output is None:
+        result = WRAPPED_NULL_BYTE
+    elif output_type == bool:
+        result = ("true" if output else "false")
+    elif output_type == bytearray:
+        result = str(output)
+    elif output_type == datetime:
+        result = output.isoformat()
+    elif utfEncodeAllFields or output_type == str or output_type == unicode:
+        # unicode is necessary in cases where we're encoding non-strings.
+        result = unicode(output).encode('utf-8')
+    else:
+        result = str(output)
+
+    if out_schema == "blob":
+        return base64.b64encode(result)
+    else:
+        return result
+
+if __name__ == '__main__':
+    controller = PythonStreamingController()
+    controller.main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4],
+                    sys.argv[5], sys.argv[6], sys.argv[7], sys.argv[8])

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/resources/python/tajo_util.py
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/python/tajo_util.py 
b/tajo-core/src/main/resources/python/tajo_util.py
new file mode 100644
index 0000000..77b28a6
--- /dev/null
+++ b/tajo-core/src/main/resources/python/tajo_util.py
@@ -0,0 +1,103 @@
+#!/usr/bin/python
+
+############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import logging
+
+class udf_logging(object):
+    udf_log_level = logging.INFO
+
+    @classmethod
+    def set_log_level_error(cls):
+        cls.udf_log_level = logging.ERROR
+
+    @classmethod
+    def set_log_level_warn(cls):
+        cls.udf_log_level = logging.WARN
+
+    @classmethod
+    def set_log_level_info(cls):
+        cls.udf_log_level = logging.INFO
+
+    @classmethod
+    def set_log_level_debug(cls):
+        cls.udf_log_level = logging.DEBUG
+
+def outputType(type_str):
+    def wrap(f):
+        def wrapped_f(*args):
+            return f(*args)
+        return wrapped_f
+    return wrap
+
+def write_user_exception(filename, stream_err_output, 
num_lines_offset_trace=0):
+    import sys
+    import traceback
+    import inspect
+    (t, v, tb) = sys.exc_info()
+    name = t.__name__
+    record_error = False
+
+    if name in ['SyntaxError', 'IndentationError']:
+        syntax_error_values = v.args
+        user_line_number = syntax_error_values[1][1] - num_lines_offset_trace
+        error_message =  "%s: %s\n\tFile: %s, line %s column %s\n\t%s" % \
+                         (name,
+                          syntax_error_values[0],
+                          syntax_error_values[1][0],
+                          user_line_number,
+                          syntax_error_values[1][2],
+                          syntax_error_values[1][3])
+    else:
+        error_message = "%s: %s\n" % (name, v)
+        user_line_number = None
+        while 1:
+            e_file_name = tb.tb_frame.f_code.co_filename
+            if e_file_name.find(filename) > 0:
+                record_error = True
+            if not record_error:
+                if not tb.tb_next:
+                    break
+                tb = tb.tb_next
+                continue
+
+            line_number = tb.tb_lineno
+            mod = inspect.getmodule(tb)
+            if mod:
+                lines, offset = inspect.getsourcelines(mod)
+                line = lines[line_number - offset - 1]
+            else:
+                #Useful to catch exceptions with an invalid module (like syntax
+                #errors)
+                lines, offset = inspect.getsourcelines(tb.tb_frame)
+                if (line_number - 1) >= len(lines):
+                    line = "Unknown Line"
+                else:
+                    line = lines[line_number - 1]
+
+            user_line_number = line_number - num_lines_offset_trace
+            func_name = tb.tb_frame.f_code.co_name
+            error_message += 'File %s, line %s, in %s\n\t%s\n' % \
+                             (e_file_name, user_line_number, func_name, line)
+            if not tb.tb_next:
+                break
+            tb = tb.tb_next
+        if name in ['UnicodeEncodeError']:
+            error_message += "\nTo print a unicode string in your udf use 
encode('utf-8').  Example: \n\tprint 'Example'.encode('utf-8')"
+    if user_line_number:
+        stream_err_output.write("%s\n" % user_line_number)
+    stream_err_output.write("%s\n" % error_message)

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java 
b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index e680736..54f9e92 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -57,6 +57,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Writer;
 import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
 import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.List;
@@ -160,6 +161,8 @@ public class TajoTestingCluster {
     // Memory cache termination
     conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1);
 
+    conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, 
getClass().getResource("/python").toString());
+
     /* Since Travi CI limits the size of standard output log up to 4MB */
     if (!StringUtils.isEmpty(LOG_LEVEL)) {
       Level defaultLevel = Logger.getRootLogger().getLevel();

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index e2dac05..36ffd0c 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -36,7 +36,10 @@ import org.apache.tajo.engine.codegen.TajoClassLoader;
 import org.apache.tajo.engine.function.FunctionLoader;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.function.FunctionSignature;
+import org.apache.tajo.master.exec.QueryExecutor;
 import org.apache.tajo.plan.*;
+import org.apache.tajo.plan.expr.EvalContext;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.serder.EvalNodeDeserializer;
 import org.apache.tajo.plan.serder.EvalNodeSerializer;
@@ -58,6 +61,7 @@ import org.junit.BeforeClass;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.TimeZone;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
@@ -91,7 +95,9 @@ public class ExprTestBase {
     cat = util.getMiniCatalogCluster().getCatalog();
     cat.createTablespace(DEFAULT_TABLESPACE_NAME, 
"hdfs://localhost:1234/warehouse");
     cat.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
-    for (FunctionDesc funcDesc : FunctionLoader.load()) {
+    Map<FunctionSignature, FunctionDesc> map = FunctionLoader.load();
+    map = FunctionLoader.loadUserDefinedFunctions(conf, map);
+    for (FunctionDesc funcDesc : map.values()) {
       cat.createFunction(funcDesc);
     }
 
@@ -126,8 +132,8 @@ public class ExprTestBase {
    * @return
    * @throws PlanningException
    */
-  private static Target[] getRawTargets(QueryContext context, String query, 
boolean condition) throws PlanningException,
-      InvalidStatementException {
+  private static Target[] getRawTargets(QueryContext context, String query, 
boolean condition)
+      throws PlanningException, InvalidStatementException {
 
     List<ParsedResult> parsedResults = SimpleParser.parseScript(query);
     if (parsedResults.size() > 1) {
@@ -265,6 +271,7 @@ public class ExprTestBase {
     Target [] targets;
 
     TajoClassLoader classLoader = new TajoClassLoader();
+    EvalContext evalContext = new EvalContext();
 
     try {
       targets = getRawTargets(queryContext, query, condition);
@@ -274,6 +281,7 @@ public class ExprTestBase {
         codegen = new EvalCodeGenerator(classLoader);
       }
 
+      QueryExecutor.startScriptExecutors(queryContext, evalContext, targets);
       Tuple outTuple = new VTuple(targets.length);
       for (int i = 0; i < targets.length; i++) {
         EvalNode eval = targets[i].getEvalTree();
@@ -281,7 +289,7 @@ public class ExprTestBase {
         if (queryContext.getBool(SessionVars.CODEGEN)) {
           eval = codegen.compile(inputSchema, eval);
         }
-        eval.bind(inputSchema);
+        eval.bind(evalContext, inputSchema);
 
         outTuple.put(i, eval.eval(vtuple));
       }
@@ -318,11 +326,12 @@ public class ExprTestBase {
       if (schema != null) {
         cat.dropTable(qualifiedTableName);
       }
+      QueryExecutor.stopScriptExecutors(evalContext);
     }
   }
 
   public static void assertEvalTreeProtoSerDer(OverridableConf context, 
EvalNode evalNode) {
     PlanProto.EvalNodeTree converted = EvalNodeSerializer.serialize(evalNode);
-    assertEquals(evalNode, EvalNodeDeserializer.deserialize(context, 
converted));
+    assertEquals(evalNode, EvalNodeDeserializer.deserialize(context, null, 
converted));
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
index a2d0598..ea42783 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
@@ -48,7 +48,7 @@ public class TestEvalTree extends ExprTestBase {
     schema1.addColumn("table1.score", INT4);
     
     BinaryEval expr = new BinaryEval(EvalType.PLUS, e1, e2);
-    expr.bind(schema1);
+    expr.bind(null, schema1);
 
     assertCloneEqual(expr);
     VTuple tuple = new VTuple(2);
@@ -161,19 +161,19 @@ public class TestEvalTree extends ExprTestBase {
     MockFalseExpr falseExpr = new MockFalseExpr();
 
     BinaryEval andExpr = new BinaryEval(EvalType.AND, trueExpr, trueExpr);
-    andExpr.bind(null);
+    andExpr.bind(null, null);
     assertTrue(andExpr.eval(null).asBool());
 
     andExpr = new BinaryEval(EvalType.AND, falseExpr, trueExpr);
-    andExpr.bind(null);
+    andExpr.bind(null, null);
     assertFalse(andExpr.eval(null).asBool());
 
     andExpr = new BinaryEval(EvalType.AND, trueExpr, falseExpr);
-    andExpr.bind(null);
+    andExpr.bind(null, null);
     assertFalse(andExpr.eval(null).asBool());
 
     andExpr = new BinaryEval(EvalType.AND, falseExpr, falseExpr);
-    andExpr.bind(null);
+    andExpr.bind(null, null);
     assertFalse(andExpr.eval(null).asBool());
   }
 
@@ -183,19 +183,19 @@ public class TestEvalTree extends ExprTestBase {
     MockFalseExpr falseExpr = new MockFalseExpr();
 
     BinaryEval orExpr = new BinaryEval(EvalType.OR, trueExpr, trueExpr);
-    orExpr.bind(null);
+    orExpr.bind(null, null);
     assertTrue(orExpr.eval(null).asBool());
 
     orExpr = new BinaryEval(EvalType.OR, falseExpr, trueExpr);
-    orExpr.bind(null);
+    orExpr.bind(null, null);
     assertTrue(orExpr.eval(null).asBool());
 
     orExpr = new BinaryEval(EvalType.OR, trueExpr, falseExpr);
-    orExpr.bind(null);
+    orExpr.bind(null, null);
     assertTrue(orExpr.eval(null).asBool());
 
     orExpr = new BinaryEval(EvalType.OR, falseExpr, falseExpr);
-    orExpr.bind(null);
+    orExpr.bind(null, null);
     assertFalse(orExpr.eval(null).asBool());
   }
 
@@ -209,41 +209,41 @@ public class TestEvalTree extends ExprTestBase {
     e1 = new ConstEval(DatumFactory.createInt4(9));
     e2 = new ConstEval(DatumFactory.createInt4(34));
     expr = new BinaryEval(EvalType.LTH, e1, e2);
-    assertTrue(expr.bind(null).eval(null).asBool());
+    assertTrue(expr.bind(null, null).eval(null).asBool());
     expr = new BinaryEval(EvalType.LEQ, e1, e2);
-    assertTrue(expr.bind(null).eval(null).asBool());
+    assertTrue(expr.bind(null, null).eval(null).asBool());
     expr = new BinaryEval(EvalType.LTH, e2, e1);
-    assertFalse(expr.bind(null).eval(null).asBool());
+    assertFalse(expr.bind(null, null).eval(null).asBool());
     expr = new BinaryEval(EvalType.LEQ, e2, e1);
-    assertFalse(expr.bind(null).eval(null).asBool());
+    assertFalse(expr.bind(null, null).eval(null).asBool());
 
     expr = new BinaryEval(EvalType.GTH, e2, e1);
-    assertTrue(expr.bind(null).eval(null).asBool());
+    assertTrue(expr.bind(null, null).eval(null).asBool());
     expr = new BinaryEval(EvalType.GEQ, e2, e1);
-    assertTrue(expr.bind(null).eval(null).asBool());
+    assertTrue(expr.bind(null, null).eval(null).asBool());
     expr = new BinaryEval(EvalType.GTH, e1, e2);
-    assertFalse(expr.bind(null).eval(null).asBool());
+    assertFalse(expr.bind(null, null).eval(null).asBool());
     expr = new BinaryEval(EvalType.GEQ, e1, e2);
-    assertFalse(expr.bind(null).eval(null).asBool());
+    assertFalse(expr.bind(null, null).eval(null).asBool());
 
     BinaryEval plus = new BinaryEval(EvalType.PLUS, e1, e2);
     expr = new BinaryEval(EvalType.LTH, e1, plus);
-    assertTrue(expr.bind(null).eval(null).asBool());
+    assertTrue(expr.bind(null, null).eval(null).asBool());
     expr = new BinaryEval(EvalType.LEQ, e1, plus);
-    assertTrue(expr.bind(null).eval(null).asBool());
+    assertTrue(expr.bind(null, null).eval(null).asBool());
     expr = new BinaryEval(EvalType.LTH, plus, e1);
-    assertFalse(expr.bind(null).eval(null).asBool());
+    assertFalse(expr.bind(null, null).eval(null).asBool());
     expr = new BinaryEval(EvalType.LEQ, plus, e1);
-    assertFalse(expr.bind(null).eval(null).asBool());
+    assertFalse(expr.bind(null, null).eval(null).asBool());
 
     expr = new BinaryEval(EvalType.GTH, plus, e1);
-    assertTrue(expr.bind(null).eval(null).asBool());
+    assertTrue(expr.bind(null, null).eval(null).asBool());
     expr = new BinaryEval(EvalType.GEQ, plus, e1);
-    assertTrue(expr.bind(null).eval(null).asBool());
+    assertTrue(expr.bind(null, null).eval(null).asBool());
     expr = new BinaryEval(EvalType.GTH, e1, plus);
-    assertFalse(expr.bind(null).eval(null).asBool());
+    assertFalse(expr.bind(null, null).eval(null).asBool());
     expr = new BinaryEval(EvalType.GEQ, e1, plus);
-    assertFalse(expr.bind(null).eval(null).asBool());
+    assertFalse(expr.bind(null, null).eval(null).asBool());
   }
 
   @Test
@@ -256,28 +256,28 @@ public class TestEvalTree extends ExprTestBase {
     e1 = new ConstEval(DatumFactory.createInt4(9));
     e2 = new ConstEval(DatumFactory.createInt4(34));
     BinaryEval expr = new BinaryEval(EvalType.PLUS, e1, e2);
-    assertEquals(expr.bind(null).eval(null).asInt4(), 43);
+    assertEquals(expr.bind(null, null).eval(null).asInt4(), 43);
     assertCloneEqual(expr);
     
     // MINUS
     e1 = new ConstEval(DatumFactory.createInt4(5));
     e2 = new ConstEval(DatumFactory.createInt4(2));
     expr = new BinaryEval(EvalType.MINUS, e1, e2);
-    assertEquals(expr.bind(null).eval(null).asInt4(), 3);
+    assertEquals(expr.bind(null, null).eval(null).asInt4(), 3);
     assertCloneEqual(expr);
     
     // MULTIPLY
     e1 = new ConstEval(DatumFactory.createInt4(5));
     e2 = new ConstEval(DatumFactory.createInt4(2));
     expr = new BinaryEval(EvalType.MULTIPLY, e1, e2);
-    assertEquals(expr.bind(null).eval(null).asInt4(), 10);
+    assertEquals(expr.bind(null, null).eval(null).asInt4(), 10);
     assertCloneEqual(expr);
     
     // DIVIDE
     e1 = new ConstEval(DatumFactory.createInt4(10));
     e2 = new ConstEval(DatumFactory.createInt4(5));
     expr = new BinaryEval(EvalType.DIVIDE, e1, e2);
-    assertEquals(expr.bind(null).eval(null).asInt4(), 2);
+    assertEquals(expr.bind(null, null).eval(null).asInt4(), 2);
     assertCloneEqual(expr);
   }
 
@@ -293,7 +293,7 @@ public class TestEvalTree extends ExprTestBase {
     assertEquals(CatalogUtil.newSimpleDataType(INT4), expr.getValueType());
 
     expr = new BinaryEval(EvalType.LTH, e1, e2);
-    assertTrue(expr.bind(null).eval(null).asBool());
+    assertTrue(expr.bind(null, null).eval(null).asBool());
     assertEquals(CatalogUtil.newSimpleDataType(BOOLEAN), expr.getValueType());
 
     e1 = new ConstEval(DatumFactory.createFloat8(9.3));
@@ -384,7 +384,7 @@ public class TestEvalTree extends ExprTestBase {
       binEval.eval(null);
       fail("EvalNode is not binded");
     } catch (IllegalStateException e) {
-      assertTrue(binEval.bind(null).eval(null).asBool());
+      assertTrue(binEval.bind(null, null).eval(null).asBool());
     }
 
     CaseWhenEval caseWhenEval = new CaseWhenEval();
@@ -393,7 +393,7 @@ public class TestEvalTree extends ExprTestBase {
       caseWhenEval.eval(null);
       fail("EvalNode is not binded");
     } catch (IllegalStateException e) {
-      assertEquals(caseWhenEval.bind(null).eval(null).asInt4(), 1);
+      assertEquals(caseWhenEval.bind(null, null).eval(null).asInt4(), 1);
     }
 
     Schema schema = new Schema(new Column[]{new Column("test", 
TajoDataTypes.Type.INT4)});
@@ -404,7 +404,7 @@ public class TestEvalTree extends ExprTestBase {
       regexEval.eval(null);
       fail("EvalNode is not binded");
     } catch (IllegalStateException e) {
-      assertEquals(regexEval.bind(schema).eval(tuple).asBool(), true);
+      assertEquals(regexEval.bind(null, schema).eval(tuple).asBool(), true);
     }
 
     RowConstantEval rowConstantEval = new RowConstantEval(new Datum[]{});
@@ -412,7 +412,7 @@ public class TestEvalTree extends ExprTestBase {
       rowConstantEval.eval(null);
       fail("EvalNode is not binded");
     } catch (IllegalStateException e) {
-      assertEquals(rowConstantEval.bind(null).eval(null).isNull(), true);
+      assertEquals(rowConstantEval.bind(null, null).eval(null).isNull(), true);
     }
   }
   

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index e23a34b..0466a24 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -265,12 +265,12 @@ public class TestEvalTreeUtil {
     FieldEval field = first.getLeftExpr();
     assertEquals(col1, field.getColumnRef());
     assertEquals(EvalType.LTH, first.getType());
-    assertEquals(10, first.getRightExpr().bind(null).eval(null).asInt4());
+    assertEquals(10, first.getRightExpr().bind(null, 
null).eval(null).asInt4());
     
     field = second.getRightExpr();
     assertEquals(col1, field.getColumnRef());
     assertEquals(EvalType.LTH, second.getType());
-    assertEquals(4, second.getLeftExpr().bind(null).eval(null).asInt4());
+    assertEquals(4, second.getLeftExpr().bind(null, null).eval(null).asInt4());
   }
   
   @Test
@@ -304,10 +304,10 @@ public class TestEvalTreeUtil {
     Target [] targets = getRawTargets(QUERIES[0]);
     EvalNode node = 
AlgebraicUtil.eliminateConstantExprs(targets[0].getEvalTree());
     assertEquals(EvalType.CONST, node.getType());
-    assertEquals(7, node.bind(null).eval(null).asInt4());
+    assertEquals(7, node.bind(null, null).eval(null).asInt4());
     node = AlgebraicUtil.eliminateConstantExprs(targets[1].getEvalTree());
     assertEquals(EvalType.CONST, node.getType());
-    assertTrue(7.0d == node.bind(null).eval(null).asFloat8());
+    assertTrue(7.0d == node.bind(null, null).eval(null).asFloat8());
 
     Expr expr = analyzer.parse(QUERIES[1]);
     LogicalPlan plan = planner.createPlan(defaultContext, expr, true);
@@ -335,7 +335,7 @@ public class TestEvalTreeUtil {
     assertEquals(EvalType.GTH, transposed.getType());
     FieldEval field = transposed.getLeftExpr();
     assertEquals(col1, field.getColumnRef());
-    assertEquals(1, transposed.getRightExpr().bind(null).eval(null).asInt4());
+    assertEquals(1, transposed.getRightExpr().bind(null, 
null).eval(null).asInt4());
 
     node = getRootSelection(QUERIES[4]);
     // we expect that score < 3
@@ -343,7 +343,7 @@ public class TestEvalTreeUtil {
     assertEquals(EvalType.LTH, transposed.getType());
     field = transposed.getLeftExpr();
     assertEquals(col1, field.getColumnRef());
-    assertEquals(2, transposed.getRightExpr().bind(null).eval(null).asInt4());
+    assertEquals(2, transposed.getRightExpr().bind(null, 
null).eval(null).asInt4());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/function/TestMathFunctions.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestMathFunctions.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestMathFunctions.java
index 49ef7e0..78509f7 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestMathFunctions.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestMathFunctions.java
@@ -394,9 +394,9 @@ public class TestMathFunctions extends ExprTestBase {
 
   @Test
   public void testPow() throws IOException {
-    testSimpleEval("select pow(9,3) as col1 ", new 
String[]{String.valueOf(Math.pow(9,3))});
-    testSimpleEval("select pow(1.0,3) as col1 ", new 
String[]{String.valueOf(Math.pow(1.0,3))});
-    testSimpleEval("select pow(20.1,3.1) as col1 ", new 
String[]{String.valueOf(Math.pow(20.1,3.1))});
+    testSimpleEval("select pow(9,3) as col1 ", new 
String[]{String.valueOf(Math.pow(9, 3))});
+    testSimpleEval("select pow(1.0,3) as col1 ", new 
String[]{String.valueOf(Math.pow(1.0, 3))});
+    testSimpleEval("select pow(20.1,3.1) as col1 ", new 
String[]{String.valueOf(Math.pow(20.1, 3.1))});
     testSimpleEval("select pow(null,3.1) as col1 ", new String[]{""});
     testSimpleEval("select pow(20.1,null) as col1 ", new String[]{""});
 
@@ -408,8 +408,8 @@ public class TestMathFunctions extends ExprTestBase {
 
     testEval(schema, "table1", "0.4,2.7,3,2", "select pow(col1, col2), 
pow(col3, col4) from table1",
         new String[]{
-            String.valueOf(Math.pow((float)0.4, 2.7)),
-            String.valueOf(Math.pow(3,2))
+            String.valueOf(Math.pow((float) 0.4, 2.7)),
+            String.valueOf(Math.pow(3, 2))
         });
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/function/TestPythonFunctions.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestPythonFunctions.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestPythonFunctions.java
new file mode 100644
index 0000000..47a0ad2
--- /dev/null
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestPythonFunctions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function;
+
+import org.apache.tajo.engine.eval.ExprTestBase;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestPythonFunctions extends ExprTestBase {
+
+  @Test
+  public void testFunctions() throws IOException {
+    testSimpleEval("select return_one()", new String[]{"1"});
+    testSimpleEval("select helloworld()", new String[]{"Hello, World"});
+    testSimpleEval("select concat_py('1')", new String[]{"11"});
+    testSimpleEval("select comma_format(12345)", new String[]{"12,345"});
+    testSimpleEval("select sum_py(1,2)", new String[]{"3"});
+    testSimpleEval("select percent(386, 1000)", new String[]{"38.6"});
+    testSimpleEval("select concat4('Tajo', 'is', 'awesome', '!')", new 
String[]{"Tajo is awesome !"});
+  }
+
+  @Test
+  public void testNestedFunctions() throws IOException {
+    testSimpleEval("select sum_py(3, return_one())", new String[]{"4"});
+    testSimpleEval("select concat_py(helloworld())", new String[]{"Hello, 
WorldHello, World"});
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index d1756e1..15a1c9f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -799,4 +799,18 @@ public class TestGroupByQuery extends QueryTestCaseBase {
     assertResultSet(res);
     cleanupQuery(res);
   }
+
+  @Test
+  public final void testGroupbyWithPythonFunc() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testGroupbyWithPythonFunc2() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
index 513868b..002f05d 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
@@ -710,6 +710,27 @@ public class TestSelectQuery extends QueryTestCaseBase {
   }
 
   @Test
+  public void testSelectPythonFuncs() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testSelectWithPredicateOnPythonFunc() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testNestedPythonFunction() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
   public void testSelectWithParentheses1() throws Exception {
     ResultSet res = executeQuery();
     assertResultSet(res);

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/python/__init__.py
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/python/__init__.py 
b/tajo-core/src/test/resources/python/__init__.py
new file mode 100644
index 0000000..8093a2f
--- /dev/null
+++ b/tajo-core/src/test/resources/python/__init__.py
@@ -0,0 +1,17 @@
+#!/usr/bin/python
+
+############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/python/test_funcs.py
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/python/test_funcs.py 
b/tajo-core/src/test/resources/python/test_funcs.py
new file mode 100644
index 0000000..d6b7db5
--- /dev/null
+++ b/tajo-core/src/test/resources/python/test_funcs.py
@@ -0,0 +1,33 @@
+############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from tajo_util import outputType
+
+@outputType('int4')
+def return_one():
+    return 1
+
+@outputType("text")
+def helloworld():
+    return 'Hello, World'
+
+# No decorator - blob
+def concat_py(str):
+    return str+str
+
+@outputType('int4')
+def sum_py(a,b):
+    return a+b

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/python/test_funcs.pyc
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/python/test_funcs.pyc 
b/tajo-core/src/test/resources/python/test_funcs.pyc
new file mode 100644
index 0000000..cc84dc1
Binary files /dev/null and b/tajo-core/src/test/resources/python/test_funcs.pyc 
differ

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/python/test_funcs2.py
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/python/test_funcs2.py 
b/tajo-core/src/test/resources/python/test_funcs2.py
new file mode 100644
index 0000000..e8db7b5
--- /dev/null
+++ b/tajo-core/src/test/resources/python/test_funcs2.py
@@ -0,0 +1,32 @@
+############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from tajo_util import outputType
+
+#Percent- Percentage
+@outputType("float8")
+def percent(num, total):
+    return num * 100 / float(total)
+
+#commaFormat- format a number with commas, 12345-> 12,345
+@outputType("text")
+def comma_format(num):
+    return '{:,}'.format(num)
+
+#concatMultiple- concat multiple words
+@outputType("text")
+def concat4(word1, word2, word3, word4):
+    return word1 + " " + word2 + " " + word3 + " " + word4
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc.sql
 
b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc.sql
new file mode 100644
index 0000000..888552a
--- /dev/null
+++ 
b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc.sql
@@ -0,0 +1 @@
+select count(*) from nation where sum_py(n_nationkey, 1) > 2 group by 
n_regionkey
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc2.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc2.sql
 
b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc2.sql
new file mode 100644
index 0000000..bcfce13
--- /dev/null
+++ 
b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc2.sql
@@ -0,0 +1 @@
+select n_regionkey, count(*) as cnt from nation group by n_regionkey having 
percent(cnt, 25) > 10
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/queries/TestSelectQuery/testNestedPythonFunction.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestSelectQuery/testNestedPythonFunction.sql
 
b/tajo-core/src/test/resources/queries/TestSelectQuery/testNestedPythonFunction.sql
new file mode 100644
index 0000000..75b33ae
--- /dev/null
+++ 
b/tajo-core/src/test/resources/queries/TestSelectQuery/testNestedPythonFunction.sql
@@ -0,0 +1 @@
+select * from nation where sum_py(n_regionkey, return_one()) < 2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectPythonFuncs.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectPythonFuncs.sql
 
b/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectPythonFuncs.sql
new file mode 100644
index 0000000..bcb9806
--- /dev/null
+++ 
b/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectPythonFuncs.sql
@@ -0,0 +1,2 @@
+select helloworld(), sum_py(n_nationkey, n_regionkey) as sum, 
concat_py(n_name) as concat
+from nation where n_nationkey < 5
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectWithPredicateOnPythonFunc.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectWithPredicateOnPythonFunc.sql
 
b/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectWithPredicateOnPythonFunc.sql
new file mode 100644
index 0000000..d2c5082
--- /dev/null
+++ 
b/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectWithPredicateOnPythonFunc.sql
@@ -0,0 +1 @@
+select * from nation where sum_py(n_regionkey,1) > 2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result
 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result
new file mode 100644
index 0000000..2a5fb8a
--- /dev/null
+++ 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result
@@ -0,0 +1,7 @@
+?count
+-------------------------------
+4
+5
+5
+5
+4
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result
 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result
new file mode 100644
index 0000000..08561fd
--- /dev/null
+++ 
b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result
@@ -0,0 +1,7 @@
+n_regionkey,cnt
+-------------------------------
+0,5
+3,5
+4,5
+2,5
+1,5
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/results/TestSelectQuery/testNestedPythonFunction.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestSelectQuery/testNestedPythonFunction.result
 
b/tajo-core/src/test/resources/results/TestSelectQuery/testNestedPythonFunction.result
new file mode 100644
index 0000000..899f034
--- /dev/null
+++ 
b/tajo-core/src/test/resources/results/TestSelectQuery/testNestedPythonFunction.result
@@ -0,0 +1,7 @@
+n_nationkey,n_name,n_regionkey,n_comment
+-------------------------------
+0,ALGERIA,0, haggle. carefully final deposits detect slyly agai
+5,ETHIOPIA,0,ven packages wake quickly. regu
+14,KENYA,0, pending excuses haggle furiously deposits. pending, express pinto 
beans wake fluffily past t
+15,MOROCCO,0,rns. blithely bold courts among the closely regular packages use 
furiously bold platelets?
+16,MOZAMBIQUE,0,s. ironic, unusual asymptotes wake blithely r
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/results/TestSelectQuery/testSelectPythonFuncs.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestSelectQuery/testSelectPythonFuncs.result
 
b/tajo-core/src/test/resources/results/TestSelectQuery/testSelectPythonFuncs.result
new file mode 100644
index 0000000..877e9fc
--- /dev/null
+++ 
b/tajo-core/src/test/resources/results/TestSelectQuery/testSelectPythonFuncs.result
@@ -0,0 +1,7 @@
+?helloworld,sum,concat
+-------------------------------
+Hello, World,0,ALGERIAALGERIA
+Hello, World,2,ARGENTINAARGENTINA
+Hello, World,3,BRAZILBRAZIL
+Hello, World,4,CANADACANADA
+Hello, World,8,EGYPTEGYPT
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/results/TestSelectQuery/testSelectWithPredicateOnPythonFunc.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestSelectQuery/testSelectWithPredicateOnPythonFunc.result
 
b/tajo-core/src/test/resources/results/TestSelectQuery/testSelectWithPredicateOnPythonFunc.result
new file mode 100644
index 0000000..6d533b4
--- /dev/null
+++ 
b/tajo-core/src/test/resources/results/TestSelectQuery/testSelectWithPredicateOnPythonFunc.result
@@ -0,0 +1,17 @@
+n_nationkey,n_name,n_regionkey,n_comment
+-------------------------------
+4,EGYPT,4,y above the carefully unusual theodolites. final dugouts are quickly 
across the furiously regular d
+6,FRANCE,3,refully final requests. regular, ironi
+7,GERMANY,3,l platelets. regular accounts x-ray: unusual, regular acco
+8,INDIA,2,ss excuses cajole slyly across the packages. deposits print aroun
+9,INDONESIA,2, slyly express asymptotes. regular deposits haggle slyly. 
carefully ironic hockey players sleep blithely. carefull
+10,IRAN,4,efully alongside of the slyly final dependencies. 
+11,IRAQ,4,nic deposits boost atop the quickly final requests? quickly regula
+12,JAPAN,2,ously. final, express gifts cajole a
+13,JORDAN,4,ic deposits are blithely about the carefully regular pa
+18,CHINA,2,c dependencies. furiously express notornis sleep slyly regular 
accounts. ideas sleep. depos
+19,ROMANIA,3,ular asymptotes are about the furious multipliers. express 
dependencies nag above the ironically ironic account
+20,SAUDI ARABIA,4,ts. silent requests haggle. closely express packages sleep 
across the blithely
+21,VIETNAM,2,hely enticingly express accounts. even, final 
+22,RUSSIA,3, requests against the platelets use never according to the quickly 
regular pint
+23,UNITED KINGDOM,3,eans boost carefully special requests. accounts are. 
carefull
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-docs/src/main/sphinx/functions.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/functions.rst 
b/tajo-docs/src/main/sphinx/functions.rst
index 453edf4..8200bfd 100644
--- a/tajo-docs/src/main/sphinx/functions.rst
+++ b/tajo-docs/src/main/sphinx/functions.rst
@@ -2,6 +2,12 @@
 Functions
 ******************
 
+Tajo provides extensive supports for functions. It includes a lot of built-in 
functions and user-defined functions which is implemented in Python.
+
+===================
+Built-in Functions
+===================
+
 .. toctree::
     :maxdepth: 1
 
@@ -9,4 +15,66 @@ Functions
     functions/string_func_and_operators
     functions/datetime_func_and_operators
     functions/network_func_and_operators
-    functions/json_func
\ No newline at end of file
+    functions/json_func
+
+==============================
+Python User-defined Functions
+==============================
+
+-----------------------
+Function registration
+-----------------------
+
+To register Python UDFs, you must install script files in all cluster nodes.
+After that, you can register your functions by specifying the paths to those 
script files in ``tajo-site.xml``. Here is an example of the configuration.
+
+.. code-block:: xml
+
+  <property>
+    <name>tajo.function.python.code-dir</name>
+    <value>/path/to/script1.py,/path/to/script2.py</value>
+  </property>
+
+Please note that you can specify multiple paths with ``','`` as a delimiter. 
Each file can contain multiple functions. Here is a typical example of a script 
file.
+
+.. code-block:: python
+
+  # /path/to/script1.py
+
+  @outputType('int4')
+  def return_one():
+    return 1
+
+  @outputType("text")
+  def helloworld():
+    return 'Hello, World'
+
+  # No decorator - blob
+  def concat_py(str):
+    return str+str
+
+  @outputType('int4')
+  def sum_py(a,b):
+    return a+b
+
+If the configuration is set properly, every function in the script files are 
registered when the Tajo cluster starts up.
+
+-----------------------
+Decorators and types
+-----------------------
+
+By default, every function has a return type of ``BLOB``.
+You can use Python decorators to define output types for the script functions. 
Tajo can figure out return types from the annotations of the Python script.
+
+* ``outputType``: Defines the return data type for a script UDF in a format 
that Tajo can understand. The defined type must be one of the types supported 
by Tajo. For supported types, please refer to :doc:`/sql_language/data_model`.
+
+-----------------------
+Query example
+-----------------------
+
+Once the Python UDFs are successfully registered, you can use them as other 
built-in functions.
+
+.. code-block:: sql
+
+  default> select concat_py(n_name)::text from nation where 
sum_py(n_regionkey,1) > 2;
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-docs/src/main/sphinx/functions/json_func.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/functions/json_func.rst 
b/tajo-docs/src/main/sphinx/functions/json_func.rst
index 5bf5814..d05cc1b 100644
--- a/tajo-docs/src/main/sphinx/functions/json_func.rst
+++ b/tajo-docs/src/main/sphinx/functions/json_func.rst
@@ -3,6 +3,7 @@ JSON Functions
 *******************************
 
 .. function:: json_extract_path_text (string json, string xpath)
+
   Extracts JSON string from a JSON string based on json path specified and 
returns JSON string pointed to by xPath
 
   :param string:

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java
index 42a7380..0c5a012 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.plan;
 
 import com.google.common.collect.Sets;
+import org.apache.commons.collections.set.UnmodifiableSet;
 import org.apache.tajo.OverridableConf;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.*;
@@ -34,7 +35,6 @@ import org.apache.tajo.exception.InvalidOperationException;
 import org.apache.tajo.plan.algebra.BaseAlgebraVisitor;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.function.AggFunction;
-import org.apache.tajo.plan.function.GeneralFunction;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.nameresolver.NameResolver;
 import org.apache.tajo.plan.nameresolver.NameResolvingMode;
@@ -43,6 +43,7 @@ import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.datetime.DateTimeUtil;
 import org.apache.tajo.util.datetime.TimeMeta;
 
+import java.io.IOException;
 import java.util.Set;
 import java.util.Stack;
 import java.util.TimeZone;
@@ -384,7 +385,7 @@ public class ExprAnnotator extends 
BaseAlgebraVisitor<ExprAnnotator.Context, Eva
       if (!EvalTreeUtil.checkIfCanBeConstant(evalNodes[i])) {
         throw new PlanningException("Non constant values cannot be included in 
IN PREDICATE.");
       }
-      values[i] = EvalTreeUtil.evaluateImmediately(evalNodes[i]);
+      values[i] = EvalTreeUtil.evaluateImmediately(null, evalNodes[i]);
     }
     return new RowConstantEval(values);
   }
@@ -608,7 +609,7 @@ public class ExprAnnotator extends 
BaseAlgebraVisitor<ExprAnnotator.Context, Eva
       FunctionType functionType = funcDesc.getFuncType();
       if (functionType == FunctionType.GENERAL
           || functionType == FunctionType.UDF) {
-        return new GeneralFunctionEval(ctx.queryContext, funcDesc, 
(GeneralFunction) funcDesc.newInstance(), givenArgs);
+        return new GeneralFunctionEval(ctx.queryContext, funcDesc, givenArgs);
       } else if (functionType == FunctionType.AGGREGATION
           || functionType == FunctionType.UDA) {
         if (!ctx.currentBlock.hasNode(NodeType.GROUP_BY)) {
@@ -623,6 +624,8 @@ public class ExprAnnotator extends 
BaseAlgebraVisitor<ExprAnnotator.Context, Eva
       }
     } catch (InternalException e) {
       throw new PlanningException(e);
+    } catch (IOException e) {
+      throw new PlanningException(e);
     }
   }
 
@@ -679,7 +682,8 @@ public class ExprAnnotator extends 
BaseAlgebraVisitor<ExprAnnotator.Context, Eva
   }
 
   public static final Set<String> WINDOW_FUNCTIONS =
-      Sets.newHashSet("row_number", "rank", "dense_rank", "percent_rank", 
"cume_dist", "first_value", "lag");
+      UnmodifiableSet.decorate(
+          Sets.newHashSet("row_number", "rank", "dense_rank", "percent_rank", 
"cume_dist", "first_value", "lag"));
 
   public EvalNode visitWindowFunction(Context ctx, Stack<Expr> stack, 
WindowFunctionExpr windowFunc)
       throws PlanningException {
@@ -904,9 +908,9 @@ public class ExprAnnotator extends 
BaseAlgebraVisitor<ExprAnnotator.Context, Eva
 
   public static int [] dateToIntArray(String years, String months, String days)
       throws PlanningException {
-    int year = Integer.valueOf(years);
-    int month = Integer.valueOf(months);
-    int day = Integer.valueOf(days);
+    int year = Integer.parseInt(years);
+    int month = Integer.parseInt(months);
+    int day = Integer.parseInt(days);
 
     if (!(1 <= year && year <= 9999)) {
       throw new PlanningException(String.format("Years (%d) must be between 1 
and 9999 integer value", year));
@@ -930,12 +934,12 @@ public class ExprAnnotator extends 
BaseAlgebraVisitor<ExprAnnotator.Context, Eva
 
   public static int [] timeToIntArray(String hours, String minutes, String 
seconds, String fractionOfSecond)
       throws PlanningException {
-    int hour = Integer.valueOf(hours);
-    int minute = Integer.valueOf(minutes);
-    int second = Integer.valueOf(seconds);
+    int hour = Integer.parseInt(hours);
+    int minute = Integer.parseInt(minutes);
+    int second = Integer.parseInt(seconds);
     int fraction = 0;
     if (fractionOfSecond != null) {
-      fraction = Integer.valueOf(fractionOfSecond);
+      fraction = Integer.parseInt(fractionOfSecond);
     }
 
     if (!(0 <= hour && hour <= 23)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index d7c631b..d1c1a15 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -115,6 +115,10 @@ public class LogicalPlanner extends 
BaseAlgebraVisitor<LogicalPlanner.PlanContex
       return queryBlock;
     }
 
+    public OverridableConf getQueryContext() {
+      return queryContext;
+    }
+
     public String toString() {
       return "block=" + queryBlock.getName() + ", relNum=" + 
queryBlock.getRelations().size() + ", "+
           queryBlock.namedExprsMgr.toString();
@@ -132,7 +136,8 @@ public class LogicalPlanner extends 
BaseAlgebraVisitor<LogicalPlanner.PlanContex
   }
 
   @VisibleForTesting
-  public LogicalPlan createPlan(OverridableConf queryContext, Expr expr, 
boolean debug) throws PlanningException {
+  public LogicalPlan createPlan(OverridableConf queryContext, Expr expr, 
boolean debug)
+      throws PlanningException {
 
     LogicalPlan plan = new LogicalPlan(this);
 
@@ -810,7 +815,7 @@ public class LogicalPlanner extends 
BaseAlgebraVisitor<LogicalPlanner.PlanContex
     limitNode.setInSchema(child.getOutSchema());
     limitNode.setOutSchema(child.getOutSchema());
 
-    firstFetNum.bind(null);
+    firstFetNum.bind(null, null);
     limitNode.setFetchFirst(firstFetNum.eval(null).asInt8());
 
     return limitNode;

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java
index fb05f33..6cf7272 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java
@@ -149,7 +149,7 @@ public class AlgebraicUtil {
       }
 
       if (lhs.getType() == EvalType.CONST && rhs.getType() == EvalType.CONST) {
-        return new ConstEval(binaryEval.bind(null).eval(null));
+        return new ConstEval(binaryEval.bind(null, null).eval(null));
       }
 
       return binaryEval;
@@ -162,7 +162,7 @@ public class AlgebraicUtil {
       stack.pop();
 
       if (child.getType() == EvalType.CONST) {
-        return new ConstEval(unaryEval.bind(null).eval(null));
+        return new ConstEval(unaryEval.bind(null, null).eval(null));
       }
 
       return unaryEval;
@@ -184,7 +184,7 @@ public class AlgebraicUtil {
       }
 
       if (constantOfAllDescendents && evalNode.getType() == EvalType.FUNCTION) 
{
-        return new ConstEval(evalNode.bind(null).eval(null));
+        return new ConstEval(evalNode.bind(null, null).eval(null));
       } else {
         return evalNode;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/BetweenPredicateEval.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/BetweenPredicateEval.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/BetweenPredicateEval.java
index 2db4368..eb3e36b 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/BetweenPredicateEval.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/BetweenPredicateEval.java
@@ -80,7 +80,7 @@ public class BetweenPredicateEval extends EvalNode implements 
Cloneable {
   }
 
   private static interface Checker {
-    void bind(Schema schema);
+    void bind(EvalContext evalContext, Schema schema);
     Datum eval(Tuple param);
   }
 
@@ -103,8 +103,8 @@ public class BetweenPredicateEval extends EvalNode 
implements Cloneable {
     }
 
     @Override
-    public void bind(Schema schema) {
-      predicand.bind(schema);
+    public void bind(EvalContext evalContext, Schema schema) {
+      predicand.bind(evalContext, schema);
     }
 
     @Override
@@ -134,10 +134,10 @@ public class BetweenPredicateEval extends EvalNode 
implements Cloneable {
     }
 
     @Override
-    public void bind(Schema schema) {
-      predicand.bind(schema);
-      begin.bind(schema);
-      end.bind(schema);
+    public void bind(EvalContext evalContext, Schema schema) {
+      predicand.bind(evalContext, schema);
+      begin.bind(evalContext, schema);
+      end.bind(evalContext, schema);
     }
 
     @Override
@@ -170,10 +170,10 @@ public class BetweenPredicateEval extends EvalNode 
implements Cloneable {
     }
 
     @Override
-    public void bind(Schema schema) {
-      predicand.bind(schema);
-      begin.bind(schema);
-      end.bind(schema);
+    public void bind(EvalContext evalContext, Schema schema) {
+      predicand.bind(evalContext, schema);
+      begin.bind(evalContext, schema);
+      end.bind(evalContext, schema);
     }
 
     @Override
@@ -227,8 +227,8 @@ public class BetweenPredicateEval extends EvalNode 
implements Cloneable {
   }
 
   @Override
-  public EvalNode bind(Schema schema) {
-    super.bind(schema);
+  public EvalNode bind(EvalContext evalContext, Schema schema) {
+    super.bind(evalContext, schema);
     if (begin.getType() == EvalType.CONST && end.getType() == EvalType.CONST) {
       Datum beginValue = ((ConstEval)begin).getValue();
       Datum endValue = ((ConstEval)end).getValue();
@@ -245,7 +245,7 @@ public class BetweenPredicateEval extends EvalNode 
implements Cloneable {
         checker = new AsymmetricChecker(not, predicand, begin, end);
       }
     }
-    checker.bind(schema);
+    checker.bind(evalContext, schema);
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/CastEval.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/CastEval.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/CastEval.java
index f174b79..b8b768b 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/CastEval.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/CastEval.java
@@ -31,7 +31,7 @@ import java.util.TimeZone;
 
 import static org.apache.tajo.common.TajoDataTypes.DataType;
 
-public class CastEval extends UnaryEval {
+public class CastEval extends UnaryEval implements Cloneable {
   @Expose private DataType target;
   @Expose private TimeZone timezone;
 
@@ -97,11 +97,24 @@ public class CastEval extends UnaryEval {
     boolean valid = obj != null && obj instanceof CastEval;
     if (valid) {
       CastEval another = (CastEval) obj;
-      return child.equals(another.child) &&
-          target.equals(another.target) &&
-          TUtil.checkEquals(timezone, another.timezone);
+      boolean b1 = child.equals(another.child);
+      boolean b2 = target.equals(another.target);
+      boolean b3 = TUtil.checkEquals(timezone, another.timezone);
+      return b1 && b2 && b3;
     } else {
       return false;
     }
   }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    CastEval clone = (CastEval) super.clone();
+    if (target != null) {
+      clone.target = target;
+    }
+    if (timezone != null) {
+      clone.timezone = timezone;
+    }
+    return clone;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalContext.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalContext.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalContext.java
new file mode 100644
index 0000000..6a30e77
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalContext.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.expr;
+
+import org.apache.tajo.plan.function.python.TajoScriptEngine;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class EvalContext {
+  private final Map<EvalNode, TajoScriptEngine> scriptEngineMap = 
TUtil.newHashMap();
+
+  public void addScriptEngine(EvalNode evalNode, TajoScriptEngine 
scriptExecutor) {
+    this.scriptEngineMap.put(evalNode, scriptExecutor);
+  }
+
+  public boolean hasScriptEngine(EvalNode evalNode) {
+    return this.scriptEngineMap.containsKey(evalNode);
+  }
+
+  public TajoScriptEngine getScriptEngine(EvalNode evalNode) {
+    return this.scriptEngineMap.get(evalNode);
+  }
+
+  public Collection<TajoScriptEngine> getAllScriptEngines() {
+    return this.scriptEngineMap.values();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java
index 154b0fd..9abb0bc 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.plan.expr;
 
 import com.google.gson.annotations.Expose;
+import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.common.TajoDataTypes.DataType;
@@ -61,9 +62,9 @@ public abstract class EvalNode implements Cloneable, 
GsonObject, ProtoObject<Pla
     return PlanGsonHelper.toJson(this, EvalNode.class);
        }
 
-  public EvalNode bind(Schema schema) {
+  public EvalNode bind(@Nullable EvalContext evalContext, Schema schema) {
     for (int i = 0; i < childNum(); i++) {
-      getChild(i).bind(schema);
+      getChild(i).bind(evalContext, schema);
     }
     isBinded = true;
     return this;

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java
index 5e3843c..1fa2fe0 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java
@@ -570,8 +570,8 @@ public class EvalTreeUtil {
     return findUniqueColumns(evalNode).size() == 0 && 
findDistinctAggFunction(evalNode).size() == 0;
   }
 
-  public static Datum evaluateImmediately(EvalNode evalNode) {
-    evalNode.bind(null);
+  public static Datum evaluateImmediately(EvalContext evalContext, EvalNode 
evalNode) {
+    evalNode.bind(evalContext, null);
     return evalNode.eval(null);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FieldEval.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FieldEval.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FieldEval.java
index 83d00b9..870970b 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FieldEval.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FieldEval.java
@@ -40,8 +40,8 @@ public class FieldEval extends EvalNode implements Cloneable {
        }
 
   @Override
-  public EvalNode bind(Schema schema) {
-    super.bind(schema);
+  public EvalNode bind(EvalContext evalContext, Schema schema) {
+    super.bind(evalContext, schema);
     // TODO - column namespace should be improved to simplify name handling 
and resolving.
     if (column.hasQualifier()) {
       fieldId = schema.getColumnId(column.getQualifiedName());

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java
index 4ff7548..dd9121b 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java
@@ -49,8 +49,8 @@ public abstract class FunctionEval extends EvalNode 
implements Cloneable {
        }
 
   @Override
-  public EvalNode bind(Schema schema) {
-    super.bind(schema);
+  public EvalNode bind(EvalContext evalContext, Schema schema) {
+    super.bind(evalContext, schema);
     this.params = new VTuple(argEvals.length);
     return this;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/GeneralFunctionEval.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/GeneralFunctionEval.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/GeneralFunctionEval.java
index 2ce7850..30fbe91 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/GeneralFunctionEval.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/GeneralFunctionEval.java
@@ -18,54 +18,56 @@
 
 package org.apache.tajo.plan.expr;
 
-import com.google.common.base.Objects;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.OverridableConf;
 import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.plan.function.GeneralFunction;
+import org.apache.tajo.plan.function.FunctionInvoke;
+import org.apache.tajo.plan.function.FunctionInvokeContext;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.util.TUtil;
 
-import javax.annotation.Nullable;
+import java.io.IOException;
 
 public class GeneralFunctionEval extends FunctionEval {
-  @Expose protected GeneralFunction instance;
+  protected FunctionInvoke funcInvoke;
+  @Expose protected FunctionInvokeContext invokeContext;
 
-       public GeneralFunctionEval(@Nullable OverridableConf queryContext, 
FunctionDesc desc, GeneralFunction instance,
-                             EvalNode[] givenArgs) {
+       public GeneralFunctionEval(OverridableConf queryContext, FunctionDesc 
desc, EvalNode[] givenArgs)
+      throws IOException {
                super(EvalType.FUNCTION, desc, givenArgs);
-               this.instance = instance;
-    this.instance.init(queryContext, getParamType());
+    this.invokeContext = new FunctionInvokeContext(queryContext, 
getParamType());
+  }
+
+  @Override
+  public EvalNode bind(EvalContext evalContext, Schema schema) {
+    super.bind(evalContext, schema);
+    try {
+      this.funcInvoke = FunctionInvoke.newInstance(funcDesc);
+      if (evalContext != null && evalContext.hasScriptEngine(this)) {
+        this.invokeContext.setScriptEngine(evalContext.getScriptEngine(this));
+      }
+      this.funcInvoke.init(invokeContext);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return this;
   }
 
   @Override
   @SuppressWarnings("unchecked")
   public Datum eval(Tuple tuple) {
     super.eval(tuple);
-    return instance.eval(evalParams(tuple));
+    Datum res = funcInvoke.eval(evalParams(tuple));
+    return res;
   }
 
        @Override
-       public boolean equals(Object obj) {
-         if (obj instanceof GeneralFunctionEval) {
-      GeneralFunctionEval other = (GeneralFunctionEval) obj;
-      return super.equals(other) &&
-          TUtil.checkEquals(instance, other.instance);
-         }
-         
-         return false;
-       }
-       
-       @Override
-       public int hashCode() {
-         return Objects.hashCode(funcDesc, instance);
-       }
-       
-       @Override
   public Object clone() throws CloneNotSupportedException {
     GeneralFunctionEval eval = (GeneralFunctionEval) super.clone();
-    eval.instance = (GeneralFunction) instance.clone();
+    if (funcInvoke != null) {
+      eval.funcInvoke = (FunctionInvoke) funcInvoke.clone();
+    }
     return eval;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java
index 6faa667..cdd8dfb 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java
@@ -74,8 +74,8 @@ public abstract class PatternMatchPredicateEval extends 
BinaryEval {
   }
 
   @Override
-  public EvalNode bind(Schema schema) {
-    super.bind(schema);
+  public EvalNode bind(EvalContext evalContext, Schema schema) {
+    super.bind(evalContext, schema);
     compile(pattern);
     return this;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizationRule.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizationRule.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizationRule.java
index 7e03224..2440c52 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizationRule.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizationRule.java
@@ -20,9 +20,10 @@ package org.apache.tajo.plan.exprrewrite;
 
 import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.annotator.Prioritized;
+import org.apache.tajo.plan.expr.EvalContext;
 import org.apache.tajo.plan.expr.EvalNode;
 
 @Prioritized
 public interface EvalTreeOptimizationRule {
-  public EvalNode optimize(LogicalPlanner.PlanContext context, EvalNode tree);
+  EvalNode optimize(LogicalPlanner.PlanContext context, EvalNode tree);
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java
index a8a3ff3..680600a 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.annotator.Prioritized;
+import org.apache.tajo.plan.expr.EvalContext;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.util.ClassUtil;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
index d77f072..a8b0945 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
@@ -18,17 +18,23 @@
 
 package org.apache.tajo.plan.exprrewrite.rules;
 
+import org.apache.tajo.datum.Datum;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.exprrewrite.EvalTreeOptimizationRule;
 import org.apache.tajo.plan.annotator.Prioritized;
 import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.function.python.PythonScriptEngine;
+import org.apache.tajo.plan.function.python.TajoScriptEngine;
 
+import java.io.IOException;
 import java.util.Stack;
 
 @Prioritized(priority = 10)
 public class ConstantFolding extends 
SimpleEvalNodeVisitor<LogicalPlanner.PlanContext>
     implements EvalTreeOptimizationRule {
 
+  private final static String SLEEP_FUNCTION_NAME = "sleep";
+
   @Override
   public EvalNode optimize(LogicalPlanner.PlanContext context, EvalNode 
evalNode) {
     return visit(context, evalNode, new Stack<EvalNode>());
@@ -49,7 +55,7 @@ public class ConstantFolding extends 
SimpleEvalNodeVisitor<LogicalPlanner.PlanCo
     }
 
     if (lhs.getType() == EvalType.CONST && rhs.getType() == EvalType.CONST) {
-      binaryEval.bind(null);
+      binaryEval.bind(null, null);
       return new ConstEval(binaryEval.eval(null));
     }
 
@@ -62,8 +68,9 @@ public class ConstantFolding extends 
SimpleEvalNodeVisitor<LogicalPlanner.PlanCo
     EvalNode child = visit(context, unaryEval.getChild(), stack);
     stack.pop();
 
+    unaryEval.setChild(child);
     if (child.getType() == EvalType.CONST) {
-      unaryEval.bind(null);
+      unaryEval.bind(null, null);
       return new ConstEval(unaryEval.eval(null));
     }
 
@@ -74,7 +81,7 @@ public class ConstantFolding extends 
SimpleEvalNodeVisitor<LogicalPlanner.PlanCo
   public EvalNode visitFuncCall(LogicalPlanner.PlanContext context, 
FunctionEval evalNode, Stack<EvalNode> stack) {
     boolean constantOfAllDescendents = true;
 
-    if ("sleep".equals(evalNode.getFuncDesc().getFunctionName())) {
+    if (SLEEP_FUNCTION_NAME.equals(evalNode.getFuncDesc().getFunctionName())) {
       constantOfAllDescendents = false;
     } else {
       for (EvalNode arg : evalNode.getArgs()) {
@@ -84,8 +91,24 @@ public class ConstantFolding extends 
SimpleEvalNodeVisitor<LogicalPlanner.PlanCo
     }
 
     if (constantOfAllDescendents && evalNode.getType() == EvalType.FUNCTION) {
-      evalNode.bind(null);
-      return new ConstEval(evalNode.eval(null));
+      if (evalNode.getFuncDesc().getInvocation().hasPython()) {
+        TajoScriptEngine executor = new 
PythonScriptEngine(evalNode.getFuncDesc());
+        try {
+          executor.start(context.getQueryContext().getConf());
+          EvalContext evalContext = new EvalContext();
+          evalContext.addScriptEngine(evalNode, executor);
+          evalNode.bind(evalContext, null);
+          Datum funcRes = evalNode.eval(null);
+          executor.shutdown();
+          return new ConstEval(funcRes);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        evalNode.bind(null, null);
+        return new ConstEval(evalNode.eval(null));
+      }
+
     } else {
       return evalNode;
     }

Reply via email to