http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/resources/python/controller.py ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/python/controller.py b/tajo-core/src/main/resources/python/controller.py new file mode 100644 index 0000000..d969b34 --- /dev/null +++ b/tajo-core/src/main/resources/python/controller.py @@ -0,0 +1,330 @@ +#! /usr/bin/env python +############################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import os +import logging +import base64 + +from datetime import datetime +try: + from dateutil import parser + USE_DATEUTIL = True +except ImportError: + USE_DATEUTIL = False + +from tajo_util import write_user_exception, udf_logging + +FIELD_DELIMITER = ',' +TUPLE_START = '(' +TUPLE_END = ')' +BAG_START = '{' +BAG_END = '}' +MAP_START = '[' +MAP_END = ']' +MAP_KEY = '#' +PARAMETER_DELIMITER = '\t' +PRE_WRAP_DELIM = '|' +POST_WRAP_DELIM = '_' +NULL_BYTE = "-" +END_RECORD_DELIM = '|_\n' +END_RECORD_DELIM_LENGTH = len(END_RECORD_DELIM) + +WRAPPED_FIELD_DELIMITER = PRE_WRAP_DELIM + FIELD_DELIMITER + POST_WRAP_DELIM +WRAPPED_TUPLE_START = PRE_WRAP_DELIM + TUPLE_START + POST_WRAP_DELIM +WRAPPED_TUPLE_END = PRE_WRAP_DELIM + TUPLE_END + POST_WRAP_DELIM +WRAPPED_BAG_START = PRE_WRAP_DELIM + BAG_START + POST_WRAP_DELIM +WRAPPED_BAG_END = PRE_WRAP_DELIM + BAG_END + POST_WRAP_DELIM +WRAPPED_MAP_START = PRE_WRAP_DELIM + MAP_START + POST_WRAP_DELIM +WRAPPED_MAP_END = PRE_WRAP_DELIM + MAP_END + POST_WRAP_DELIM +WRAPPED_PARAMETER_DELIMITER = PRE_WRAP_DELIM + PARAMETER_DELIMITER + POST_WRAP_DELIM +WRAPPED_NULL_BYTE = PRE_WRAP_DELIM + NULL_BYTE + POST_WRAP_DELIM + +TYPE_TUPLE = TUPLE_START +TYPE_BAG = BAG_START +TYPE_MAP = MAP_START + +TYPE_BOOLEAN = "B" +TYPE_INTEGER = "I" +TYPE_LONG = "L" +TYPE_FLOAT = "F" +TYPE_DOUBLE = "D" +TYPE_BYTEARRAY = "A" +TYPE_CHARARRAY = "C" +TYPE_DATETIME = "T" +TYPE_BIGINTEGER = "N" +TYPE_BIGDECIMAL = "E" + +END_OF_STREAM = TYPE_CHARARRAY + "\x04" + END_RECORD_DELIM +TURN_ON_OUTPUT_CAPTURING = TYPE_CHARARRAY + "TURN_ON_OUTPUT_CAPTURING" + END_RECORD_DELIM +NUM_LINES_OFFSET_TRACE = int(os.environ.get('PYTHON_TRACE_OFFSET', 0)) + +class PythonStreamingController: + def __init__(self, profiling_mode=False): + self.profiling_mode = profiling_mode + + def main(self, + module_name, file_path, func_name, cache_path, + output_stream_path, error_stream_path, log_file_name, output_schema): + sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0) + + # Need to ensure that user functions can't write to the streams we use to communicate with pig. + self.stream_output = os.fdopen(sys.stdout.fileno(), 'wb', 0) + self.stream_error = os.fdopen(sys.stderr.fileno(), 'wb', 0) + + self.input_stream = sys.stdin + # TODO: support controller logging + # self.log_stream = open(output_stream_path, 'a') + # sys.stderr = open(error_stream_path, 'w') + + sys.path.append(file_path) + sys.path.append(cache_path) + sys.path.append('.') + + should_log = False + if should_log: + logging.basicConfig(filename=log_file_name, format="%(asctime)s %(levelname)s %(message)s", level=udf_logging.udf_log_level) + logging.info("To reduce the amount of information being logged only a small subset of rows are logged at the " + "INFO level. Call udf_logging.set_log_level_debug in tajo_util to see all rows being processed.") + + input_str = self.get_next_input() + + try: + func = __import__(module_name, globals(), locals(), [func_name], -1).__dict__[func_name] + except: + # These errors should always be caused by user code. + write_user_exception(module_name, self.stream_error, NUM_LINES_OFFSET_TRACE) + self.close_controller(-1) + + log_message = logging.info + if udf_logging.udf_log_level == logging.DEBUG: + log_message = logging.debug + + while input_str != END_OF_STREAM: + try: + try: + if should_log: + log_message("Serialized Input: %s" % (input_str)) + inputs = deserialize_input(input_str) + if should_log: + log_message("Deserialized Input: %s" % (unicode(inputs))) + except: + # Capture errors where the user passes in bad data. + write_user_exception(module_name, self.stream_error, NUM_LINES_OFFSET_TRACE) + self.close_controller(-3) + + try: + func_output = func(*inputs) + if should_log: + log_message("UDF Output: %s" % (unicode(func_output))) + except: + # These errors should always be caused by user code. + write_user_exception(module_name, self.stream_error, NUM_LINES_OFFSET_TRACE) + self.close_controller(-2) + + output = serialize_output(func_output, output_schema) + if should_log: + log_message("Serialized Output: %s" % (output)) + + self.stream_output.write( "%s%s" % (output, END_RECORD_DELIM) ) + except Exception as e: + # This should only catch internal exceptions with the controller + # and pig- not with user code. + import traceback + traceback.print_exc(file=self.stream_error) + sys.exit(-3) + + sys.stdout.flush() + sys.stderr.flush() + self.stream_output.flush() + self.stream_error.flush() + + input_str = self.get_next_input() + + def get_next_input(self): + input_stream = self.input_stream + # log_stream = self.log_stream + + input_str = input_stream.readline() + + while input_str.endswith(END_RECORD_DELIM) == False: + line = input_stream.readline() + if line == '': + input_str = '' + break + input_str += line + + if input_str == '': + return END_OF_STREAM + + if input_str == END_OF_STREAM: + return input_str + + return input_str[:-END_RECORD_DELIM_LENGTH] + + def close_controller(self, exit_code): + sys.stderr.close() + self.stream_error.write("\n") + self.stream_error.close() + sys.stdout.close() + self.stream_output.write("\n") + self.stream_output.close() + sys.exit(exit_code) + +def deserialize_input(input_str): + if len(input_str) == 0: + return [] + + return [_deserialize_input(param, 0, len(param)) for param in input_str.split(WRAPPED_FIELD_DELIMITER)] + +def _deserialize_input(input_str, si, ei): + len = ei - si + 1 + if len < 1: + # Handle all of the cases where you can have valid empty input. + if ei == si: + if input_str[si] == TYPE_CHARARRAY: + return u"" + elif input_str[si] == TYPE_BYTEARRAY: + return bytearray("") + else: + raise Exception("Got input type flag %s, but no data to go with it.\nInput string: %s\nSlice: %s" % (input_str[si], input_str, input_str[si:ei+1])) + else: + raise Exception("Start index %d greater than end index %d.\nInput string: %s\n, Slice: %s" % (si, ei, input_str[si:ei+1])) + + tokens = input_str.split(WRAPPED_PARAMETER_DELIMITER) + schema = tokens[0]; + param = tokens[1]; + + if schema == NULL_BYTE: + return None + elif schema == TYPE_CHARARRAY: + return unicode(param, 'utf-8') + elif schema == TYPE_BYTEARRAY: + return bytearray(param) + elif schema == TYPE_INTEGER: + return int(param) + elif schema == TYPE_LONG or schema == TYPE_BIGINTEGER: + return long(param) + elif schema == TYPE_FLOAT or schema == TYPE_DOUBLE or schema == TYPE_BIGDECIMAL: + return float(param) + elif schema == TYPE_BOOLEAN: + return param == "true" + elif schema == TYPE_DATETIME: + # Format is "yyyy-MM-ddTHH:mm:ss.SSS+00:00" or "2013-08-23T18:14:03.123+ZZ" + if USE_DATEUTIL: + return parser.parse(param) + else: + # Try to use datetime even though it doesn't handle time zones properly, + # We only use the first 3 microsecond digits and drop time zone (first 23 characters) + return datetime.strptime(param, "%Y-%m-%dT%H:%M:%S.%f") + else: + raise Exception("Can't determine type of input: %s" % param) + +def _deserialize_collection(input_str, return_type, si, ei): + list_result = [] + append_to_list_result = list_result.append + dict_result = {} + + index = si + field_start = si + depth = 0 + + key = None + + # recurse to deserialize elements if the collection is not empty + if ei-si+1 > 0: + while True: + if index >= ei - 2: + if return_type == TYPE_MAP: + dict_result[key] = _deserialize_input(input_str, value_start, ei) + else: + append_to_list_result(_deserialize_input(input_str, field_start, ei)) + break + + if return_type == TYPE_MAP and not key: + key_index = input_str.find(MAP_KEY, index) + key = unicode(input_str[index+1:key_index], 'utf-8') + index = key_index + 1 + value_start = key_index + 1 + continue + + if not (input_str[index] == PRE_WRAP_DELIM and input_str[index+2] == POST_WRAP_DELIM): + prewrap_index = input_str.find(PRE_WRAP_DELIM, index+1) + index = (prewrap_index if prewrap_index != -1 else end_index) + continue + + mid = input_str[index+1] + + if mid == BAG_START or mid == TUPLE_START or mid == MAP_START: + depth += 1 + elif mid == BAG_END or mid == TUPLE_END or mid == MAP_END: + depth -= 1 + elif depth == 0 and mid == FIELD_DELIMITER: + if return_type == TYPE_MAP: + dict_result[key] = _deserialize_input(input_str, value_start, index - 1) + key = None + else: + append_to_list_result(_deserialize_input(input_str, field_start, index - 1)) + field_start = index + 3 + + index += 3 + + if return_type == TYPE_MAP: + return dict_result + elif return_type == TYPE_TUPLE: + return tuple(list_result) + else: + return list_result + +def wrap_tuple(o, serialized_item): + if type(o) != tuple: + return WRAPPED_TUPLE_START + serialized_item + WRAPPED_TUPLE_END + else: + return serialized_item + +def serialize_output(output, out_schema, utfEncodeAllFields=False): + """ + @param utfEncodeStrings - Generally we want to utf encode only strings. But for + Maps we utf encode everything because on the Java side we don't know the schema + for maps so we wouldn't be able to tell which fields were encoded or not. + """ + + output_type = type(output) + + if output is None: + result = WRAPPED_NULL_BYTE + elif output_type == bool: + result = ("true" if output else "false") + elif output_type == bytearray: + result = str(output) + elif output_type == datetime: + result = output.isoformat() + elif utfEncodeAllFields or output_type == str or output_type == unicode: + # unicode is necessary in cases where we're encoding non-strings. + result = unicode(output).encode('utf-8') + else: + result = str(output) + + if out_schema == "blob": + return base64.b64encode(result) + else: + return result + +if __name__ == '__main__': + controller = PythonStreamingController() + controller.main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], + sys.argv[5], sys.argv[6], sys.argv[7], sys.argv[8])
http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/resources/python/tajo_util.py ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/python/tajo_util.py b/tajo-core/src/main/resources/python/tajo_util.py new file mode 100644 index 0000000..77b28a6 --- /dev/null +++ b/tajo-core/src/main/resources/python/tajo_util.py @@ -0,0 +1,103 @@ +#!/usr/bin/python + +############################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +class udf_logging(object): + udf_log_level = logging.INFO + + @classmethod + def set_log_level_error(cls): + cls.udf_log_level = logging.ERROR + + @classmethod + def set_log_level_warn(cls): + cls.udf_log_level = logging.WARN + + @classmethod + def set_log_level_info(cls): + cls.udf_log_level = logging.INFO + + @classmethod + def set_log_level_debug(cls): + cls.udf_log_level = logging.DEBUG + +def outputType(type_str): + def wrap(f): + def wrapped_f(*args): + return f(*args) + return wrapped_f + return wrap + +def write_user_exception(filename, stream_err_output, num_lines_offset_trace=0): + import sys + import traceback + import inspect + (t, v, tb) = sys.exc_info() + name = t.__name__ + record_error = False + + if name in ['SyntaxError', 'IndentationError']: + syntax_error_values = v.args + user_line_number = syntax_error_values[1][1] - num_lines_offset_trace + error_message = "%s: %s\n\tFile: %s, line %s column %s\n\t%s" % \ + (name, + syntax_error_values[0], + syntax_error_values[1][0], + user_line_number, + syntax_error_values[1][2], + syntax_error_values[1][3]) + else: + error_message = "%s: %s\n" % (name, v) + user_line_number = None + while 1: + e_file_name = tb.tb_frame.f_code.co_filename + if e_file_name.find(filename) > 0: + record_error = True + if not record_error: + if not tb.tb_next: + break + tb = tb.tb_next + continue + + line_number = tb.tb_lineno + mod = inspect.getmodule(tb) + if mod: + lines, offset = inspect.getsourcelines(mod) + line = lines[line_number - offset - 1] + else: + #Useful to catch exceptions with an invalid module (like syntax + #errors) + lines, offset = inspect.getsourcelines(tb.tb_frame) + if (line_number - 1) >= len(lines): + line = "Unknown Line" + else: + line = lines[line_number - 1] + + user_line_number = line_number - num_lines_offset_trace + func_name = tb.tb_frame.f_code.co_name + error_message += 'File %s, line %s, in %s\n\t%s\n' % \ + (e_file_name, user_line_number, func_name, line) + if not tb.tb_next: + break + tb = tb.tb_next + if name in ['UnicodeEncodeError']: + error_message += "\nTo print a unicode string in your udf use encode('utf-8'). Example: \n\tprint 'Example'.encode('utf-8')" + if user_line_number: + stream_err_output.write("%s\n" % user_line_number) + stream_err_output.write("%s\n" % error_message) http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index e680736..54f9e92 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -57,6 +57,7 @@ import java.io.File; import java.io.IOException; import java.io.Writer; import java.net.InetSocketAddress; +import java.net.URISyntaxException; import java.sql.ResultSet; import java.util.ArrayList; import java.util.List; @@ -160,6 +161,8 @@ public class TajoTestingCluster { // Memory cache termination conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1); + conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, getClass().getResource("/python").toString()); + /* Since Travi CI limits the size of standard output log up to 4MB */ if (!StringUtils.isEmpty(LOG_LEVEL)) { Level defaultLevel = Logger.getRootLogger().getLevel(); http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java index e2dac05..36ffd0c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java @@ -36,7 +36,10 @@ import org.apache.tajo.engine.codegen.TajoClassLoader; import org.apache.tajo.engine.function.FunctionLoader; import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.function.FunctionSignature; +import org.apache.tajo.master.exec.QueryExecutor; import org.apache.tajo.plan.*; +import org.apache.tajo.plan.expr.EvalContext; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.serder.EvalNodeDeserializer; import org.apache.tajo.plan.serder.EvalNodeSerializer; @@ -58,6 +61,7 @@ import org.junit.BeforeClass; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.TimeZone; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; @@ -91,7 +95,9 @@ public class ExprTestBase { cat = util.getMiniCatalogCluster().getCatalog(); cat.createTablespace(DEFAULT_TABLESPACE_NAME, "hdfs://localhost:1234/warehouse"); cat.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); - for (FunctionDesc funcDesc : FunctionLoader.load()) { + Map<FunctionSignature, FunctionDesc> map = FunctionLoader.load(); + map = FunctionLoader.loadUserDefinedFunctions(conf, map); + for (FunctionDesc funcDesc : map.values()) { cat.createFunction(funcDesc); } @@ -126,8 +132,8 @@ public class ExprTestBase { * @return * @throws PlanningException */ - private static Target[] getRawTargets(QueryContext context, String query, boolean condition) throws PlanningException, - InvalidStatementException { + private static Target[] getRawTargets(QueryContext context, String query, boolean condition) + throws PlanningException, InvalidStatementException { List<ParsedResult> parsedResults = SimpleParser.parseScript(query); if (parsedResults.size() > 1) { @@ -265,6 +271,7 @@ public class ExprTestBase { Target [] targets; TajoClassLoader classLoader = new TajoClassLoader(); + EvalContext evalContext = new EvalContext(); try { targets = getRawTargets(queryContext, query, condition); @@ -274,6 +281,7 @@ public class ExprTestBase { codegen = new EvalCodeGenerator(classLoader); } + QueryExecutor.startScriptExecutors(queryContext, evalContext, targets); Tuple outTuple = new VTuple(targets.length); for (int i = 0; i < targets.length; i++) { EvalNode eval = targets[i].getEvalTree(); @@ -281,7 +289,7 @@ public class ExprTestBase { if (queryContext.getBool(SessionVars.CODEGEN)) { eval = codegen.compile(inputSchema, eval); } - eval.bind(inputSchema); + eval.bind(evalContext, inputSchema); outTuple.put(i, eval.eval(vtuple)); } @@ -318,11 +326,12 @@ public class ExprTestBase { if (schema != null) { cat.dropTable(qualifiedTableName); } + QueryExecutor.stopScriptExecutors(evalContext); } } public static void assertEvalTreeProtoSerDer(OverridableConf context, EvalNode evalNode) { PlanProto.EvalNodeTree converted = EvalNodeSerializer.serialize(evalNode); - assertEquals(evalNode, EvalNodeDeserializer.deserialize(context, converted)); + assertEquals(evalNode, EvalNodeDeserializer.deserialize(context, null, converted)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java index a2d0598..ea42783 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java @@ -48,7 +48,7 @@ public class TestEvalTree extends ExprTestBase { schema1.addColumn("table1.score", INT4); BinaryEval expr = new BinaryEval(EvalType.PLUS, e1, e2); - expr.bind(schema1); + expr.bind(null, schema1); assertCloneEqual(expr); VTuple tuple = new VTuple(2); @@ -161,19 +161,19 @@ public class TestEvalTree extends ExprTestBase { MockFalseExpr falseExpr = new MockFalseExpr(); BinaryEval andExpr = new BinaryEval(EvalType.AND, trueExpr, trueExpr); - andExpr.bind(null); + andExpr.bind(null, null); assertTrue(andExpr.eval(null).asBool()); andExpr = new BinaryEval(EvalType.AND, falseExpr, trueExpr); - andExpr.bind(null); + andExpr.bind(null, null); assertFalse(andExpr.eval(null).asBool()); andExpr = new BinaryEval(EvalType.AND, trueExpr, falseExpr); - andExpr.bind(null); + andExpr.bind(null, null); assertFalse(andExpr.eval(null).asBool()); andExpr = new BinaryEval(EvalType.AND, falseExpr, falseExpr); - andExpr.bind(null); + andExpr.bind(null, null); assertFalse(andExpr.eval(null).asBool()); } @@ -183,19 +183,19 @@ public class TestEvalTree extends ExprTestBase { MockFalseExpr falseExpr = new MockFalseExpr(); BinaryEval orExpr = new BinaryEval(EvalType.OR, trueExpr, trueExpr); - orExpr.bind(null); + orExpr.bind(null, null); assertTrue(orExpr.eval(null).asBool()); orExpr = new BinaryEval(EvalType.OR, falseExpr, trueExpr); - orExpr.bind(null); + orExpr.bind(null, null); assertTrue(orExpr.eval(null).asBool()); orExpr = new BinaryEval(EvalType.OR, trueExpr, falseExpr); - orExpr.bind(null); + orExpr.bind(null, null); assertTrue(orExpr.eval(null).asBool()); orExpr = new BinaryEval(EvalType.OR, falseExpr, falseExpr); - orExpr.bind(null); + orExpr.bind(null, null); assertFalse(orExpr.eval(null).asBool()); } @@ -209,41 +209,41 @@ public class TestEvalTree extends ExprTestBase { e1 = new ConstEval(DatumFactory.createInt4(9)); e2 = new ConstEval(DatumFactory.createInt4(34)); expr = new BinaryEval(EvalType.LTH, e1, e2); - assertTrue(expr.bind(null).eval(null).asBool()); + assertTrue(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.LEQ, e1, e2); - assertTrue(expr.bind(null).eval(null).asBool()); + assertTrue(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.LTH, e2, e1); - assertFalse(expr.bind(null).eval(null).asBool()); + assertFalse(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.LEQ, e2, e1); - assertFalse(expr.bind(null).eval(null).asBool()); + assertFalse(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.GTH, e2, e1); - assertTrue(expr.bind(null).eval(null).asBool()); + assertTrue(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.GEQ, e2, e1); - assertTrue(expr.bind(null).eval(null).asBool()); + assertTrue(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.GTH, e1, e2); - assertFalse(expr.bind(null).eval(null).asBool()); + assertFalse(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.GEQ, e1, e2); - assertFalse(expr.bind(null).eval(null).asBool()); + assertFalse(expr.bind(null, null).eval(null).asBool()); BinaryEval plus = new BinaryEval(EvalType.PLUS, e1, e2); expr = new BinaryEval(EvalType.LTH, e1, plus); - assertTrue(expr.bind(null).eval(null).asBool()); + assertTrue(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.LEQ, e1, plus); - assertTrue(expr.bind(null).eval(null).asBool()); + assertTrue(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.LTH, plus, e1); - assertFalse(expr.bind(null).eval(null).asBool()); + assertFalse(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.LEQ, plus, e1); - assertFalse(expr.bind(null).eval(null).asBool()); + assertFalse(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.GTH, plus, e1); - assertTrue(expr.bind(null).eval(null).asBool()); + assertTrue(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.GEQ, plus, e1); - assertTrue(expr.bind(null).eval(null).asBool()); + assertTrue(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.GTH, e1, plus); - assertFalse(expr.bind(null).eval(null).asBool()); + assertFalse(expr.bind(null, null).eval(null).asBool()); expr = new BinaryEval(EvalType.GEQ, e1, plus); - assertFalse(expr.bind(null).eval(null).asBool()); + assertFalse(expr.bind(null, null).eval(null).asBool()); } @Test @@ -256,28 +256,28 @@ public class TestEvalTree extends ExprTestBase { e1 = new ConstEval(DatumFactory.createInt4(9)); e2 = new ConstEval(DatumFactory.createInt4(34)); BinaryEval expr = new BinaryEval(EvalType.PLUS, e1, e2); - assertEquals(expr.bind(null).eval(null).asInt4(), 43); + assertEquals(expr.bind(null, null).eval(null).asInt4(), 43); assertCloneEqual(expr); // MINUS e1 = new ConstEval(DatumFactory.createInt4(5)); e2 = new ConstEval(DatumFactory.createInt4(2)); expr = new BinaryEval(EvalType.MINUS, e1, e2); - assertEquals(expr.bind(null).eval(null).asInt4(), 3); + assertEquals(expr.bind(null, null).eval(null).asInt4(), 3); assertCloneEqual(expr); // MULTIPLY e1 = new ConstEval(DatumFactory.createInt4(5)); e2 = new ConstEval(DatumFactory.createInt4(2)); expr = new BinaryEval(EvalType.MULTIPLY, e1, e2); - assertEquals(expr.bind(null).eval(null).asInt4(), 10); + assertEquals(expr.bind(null, null).eval(null).asInt4(), 10); assertCloneEqual(expr); // DIVIDE e1 = new ConstEval(DatumFactory.createInt4(10)); e2 = new ConstEval(DatumFactory.createInt4(5)); expr = new BinaryEval(EvalType.DIVIDE, e1, e2); - assertEquals(expr.bind(null).eval(null).asInt4(), 2); + assertEquals(expr.bind(null, null).eval(null).asInt4(), 2); assertCloneEqual(expr); } @@ -293,7 +293,7 @@ public class TestEvalTree extends ExprTestBase { assertEquals(CatalogUtil.newSimpleDataType(INT4), expr.getValueType()); expr = new BinaryEval(EvalType.LTH, e1, e2); - assertTrue(expr.bind(null).eval(null).asBool()); + assertTrue(expr.bind(null, null).eval(null).asBool()); assertEquals(CatalogUtil.newSimpleDataType(BOOLEAN), expr.getValueType()); e1 = new ConstEval(DatumFactory.createFloat8(9.3)); @@ -384,7 +384,7 @@ public class TestEvalTree extends ExprTestBase { binEval.eval(null); fail("EvalNode is not binded"); } catch (IllegalStateException e) { - assertTrue(binEval.bind(null).eval(null).asBool()); + assertTrue(binEval.bind(null, null).eval(null).asBool()); } CaseWhenEval caseWhenEval = new CaseWhenEval(); @@ -393,7 +393,7 @@ public class TestEvalTree extends ExprTestBase { caseWhenEval.eval(null); fail("EvalNode is not binded"); } catch (IllegalStateException e) { - assertEquals(caseWhenEval.bind(null).eval(null).asInt4(), 1); + assertEquals(caseWhenEval.bind(null, null).eval(null).asInt4(), 1); } Schema schema = new Schema(new Column[]{new Column("test", TajoDataTypes.Type.INT4)}); @@ -404,7 +404,7 @@ public class TestEvalTree extends ExprTestBase { regexEval.eval(null); fail("EvalNode is not binded"); } catch (IllegalStateException e) { - assertEquals(regexEval.bind(schema).eval(tuple).asBool(), true); + assertEquals(regexEval.bind(null, schema).eval(tuple).asBool(), true); } RowConstantEval rowConstantEval = new RowConstantEval(new Datum[]{}); @@ -412,7 +412,7 @@ public class TestEvalTree extends ExprTestBase { rowConstantEval.eval(null); fail("EvalNode is not binded"); } catch (IllegalStateException e) { - assertEquals(rowConstantEval.bind(null).eval(null).isNull(), true); + assertEquals(rowConstantEval.bind(null, null).eval(null).isNull(), true); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java index e23a34b..0466a24 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java @@ -265,12 +265,12 @@ public class TestEvalTreeUtil { FieldEval field = first.getLeftExpr(); assertEquals(col1, field.getColumnRef()); assertEquals(EvalType.LTH, first.getType()); - assertEquals(10, first.getRightExpr().bind(null).eval(null).asInt4()); + assertEquals(10, first.getRightExpr().bind(null, null).eval(null).asInt4()); field = second.getRightExpr(); assertEquals(col1, field.getColumnRef()); assertEquals(EvalType.LTH, second.getType()); - assertEquals(4, second.getLeftExpr().bind(null).eval(null).asInt4()); + assertEquals(4, second.getLeftExpr().bind(null, null).eval(null).asInt4()); } @Test @@ -304,10 +304,10 @@ public class TestEvalTreeUtil { Target [] targets = getRawTargets(QUERIES[0]); EvalNode node = AlgebraicUtil.eliminateConstantExprs(targets[0].getEvalTree()); assertEquals(EvalType.CONST, node.getType()); - assertEquals(7, node.bind(null).eval(null).asInt4()); + assertEquals(7, node.bind(null, null).eval(null).asInt4()); node = AlgebraicUtil.eliminateConstantExprs(targets[1].getEvalTree()); assertEquals(EvalType.CONST, node.getType()); - assertTrue(7.0d == node.bind(null).eval(null).asFloat8()); + assertTrue(7.0d == node.bind(null, null).eval(null).asFloat8()); Expr expr = analyzer.parse(QUERIES[1]); LogicalPlan plan = planner.createPlan(defaultContext, expr, true); @@ -335,7 +335,7 @@ public class TestEvalTreeUtil { assertEquals(EvalType.GTH, transposed.getType()); FieldEval field = transposed.getLeftExpr(); assertEquals(col1, field.getColumnRef()); - assertEquals(1, transposed.getRightExpr().bind(null).eval(null).asInt4()); + assertEquals(1, transposed.getRightExpr().bind(null, null).eval(null).asInt4()); node = getRootSelection(QUERIES[4]); // we expect that score < 3 @@ -343,7 +343,7 @@ public class TestEvalTreeUtil { assertEquals(EvalType.LTH, transposed.getType()); field = transposed.getLeftExpr(); assertEquals(col1, field.getColumnRef()); - assertEquals(2, transposed.getRightExpr().bind(null).eval(null).asInt4()); + assertEquals(2, transposed.getRightExpr().bind(null, null).eval(null).asInt4()); } @Test http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/function/TestMathFunctions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestMathFunctions.java b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestMathFunctions.java index 49ef7e0..78509f7 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestMathFunctions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestMathFunctions.java @@ -394,9 +394,9 @@ public class TestMathFunctions extends ExprTestBase { @Test public void testPow() throws IOException { - testSimpleEval("select pow(9,3) as col1 ", new String[]{String.valueOf(Math.pow(9,3))}); - testSimpleEval("select pow(1.0,3) as col1 ", new String[]{String.valueOf(Math.pow(1.0,3))}); - testSimpleEval("select pow(20.1,3.1) as col1 ", new String[]{String.valueOf(Math.pow(20.1,3.1))}); + testSimpleEval("select pow(9,3) as col1 ", new String[]{String.valueOf(Math.pow(9, 3))}); + testSimpleEval("select pow(1.0,3) as col1 ", new String[]{String.valueOf(Math.pow(1.0, 3))}); + testSimpleEval("select pow(20.1,3.1) as col1 ", new String[]{String.valueOf(Math.pow(20.1, 3.1))}); testSimpleEval("select pow(null,3.1) as col1 ", new String[]{""}); testSimpleEval("select pow(20.1,null) as col1 ", new String[]{""}); @@ -408,8 +408,8 @@ public class TestMathFunctions extends ExprTestBase { testEval(schema, "table1", "0.4,2.7,3,2", "select pow(col1, col2), pow(col3, col4) from table1", new String[]{ - String.valueOf(Math.pow((float)0.4, 2.7)), - String.valueOf(Math.pow(3,2)) + String.valueOf(Math.pow((float) 0.4, 2.7)), + String.valueOf(Math.pow(3, 2)) }); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/function/TestPythonFunctions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestPythonFunctions.java b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestPythonFunctions.java new file mode 100644 index 0000000..47a0ad2 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestPythonFunctions.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.function; + +import org.apache.tajo.engine.eval.ExprTestBase; +import org.junit.Test; + +import java.io.IOException; + +public class TestPythonFunctions extends ExprTestBase { + + @Test + public void testFunctions() throws IOException { + testSimpleEval("select return_one()", new String[]{"1"}); + testSimpleEval("select helloworld()", new String[]{"Hello, World"}); + testSimpleEval("select concat_py('1')", new String[]{"11"}); + testSimpleEval("select comma_format(12345)", new String[]{"12,345"}); + testSimpleEval("select sum_py(1,2)", new String[]{"3"}); + testSimpleEval("select percent(386, 1000)", new String[]{"38.6"}); + testSimpleEval("select concat4('Tajo', 'is', 'awesome', '!')", new String[]{"Tajo is awesome !"}); + } + + @Test + public void testNestedFunctions() throws IOException { + testSimpleEval("select sum_py(3, return_one())", new String[]{"4"}); + testSimpleEval("select concat_py(helloworld())", new String[]{"Hello, WorldHello, World"}); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java index d1756e1..15a1c9f 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java @@ -799,4 +799,18 @@ public class TestGroupByQuery extends QueryTestCaseBase { assertResultSet(res); cleanupQuery(res); } + + @Test + public final void testGroupbyWithPythonFunc() throws Exception { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + public final void testGroupbyWithPythonFunc2() throws Exception { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java index 513868b..002f05d 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java @@ -710,6 +710,27 @@ public class TestSelectQuery extends QueryTestCaseBase { } @Test + public void testSelectPythonFuncs() throws Exception { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + public void testSelectWithPredicateOnPythonFunc() throws Exception { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + public void testNestedPythonFunction() throws Exception { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + + @Test public void testSelectWithParentheses1() throws Exception { ResultSet res = executeQuery(); assertResultSet(res); http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/python/__init__.py ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/python/__init__.py b/tajo-core/src/test/resources/python/__init__.py new file mode 100644 index 0000000..8093a2f --- /dev/null +++ b/tajo-core/src/test/resources/python/__init__.py @@ -0,0 +1,17 @@ +#!/usr/bin/python + +############################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/python/test_funcs.py ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/python/test_funcs.py b/tajo-core/src/test/resources/python/test_funcs.py new file mode 100644 index 0000000..d6b7db5 --- /dev/null +++ b/tajo-core/src/test/resources/python/test_funcs.py @@ -0,0 +1,33 @@ +############################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from tajo_util import outputType + +@outputType('int4') +def return_one(): + return 1 + +@outputType("text") +def helloworld(): + return 'Hello, World' + +# No decorator - blob +def concat_py(str): + return str+str + +@outputType('int4') +def sum_py(a,b): + return a+b http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/python/test_funcs.pyc ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/python/test_funcs.pyc b/tajo-core/src/test/resources/python/test_funcs.pyc new file mode 100644 index 0000000..cc84dc1 Binary files /dev/null and b/tajo-core/src/test/resources/python/test_funcs.pyc differ http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/python/test_funcs2.py ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/python/test_funcs2.py b/tajo-core/src/test/resources/python/test_funcs2.py new file mode 100644 index 0000000..e8db7b5 --- /dev/null +++ b/tajo-core/src/test/resources/python/test_funcs2.py @@ -0,0 +1,32 @@ +############################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from tajo_util import outputType + +#Percent- Percentage +@outputType("float8") +def percent(num, total): + return num * 100 / float(total) + +#commaFormat- format a number with commas, 12345-> 12,345 +@outputType("text") +def comma_format(num): + return '{:,}'.format(num) + +#concatMultiple- concat multiple words +@outputType("text") +def concat4(word1, word2, word3, word4): + return word1 + " " + word2 + " " + word3 + " " + word4 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc.sql new file mode 100644 index 0000000..888552a --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc.sql @@ -0,0 +1 @@ +select count(*) from nation where sum_py(n_nationkey, 1) > 2 group by n_regionkey \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc2.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc2.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc2.sql new file mode 100644 index 0000000..bcfce13 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithPythonFunc2.sql @@ -0,0 +1 @@ +select n_regionkey, count(*) as cnt from nation group by n_regionkey having percent(cnt, 25) > 10 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/queries/TestSelectQuery/testNestedPythonFunction.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testNestedPythonFunction.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/testNestedPythonFunction.sql new file mode 100644 index 0000000..75b33ae --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testNestedPythonFunction.sql @@ -0,0 +1 @@ +select * from nation where sum_py(n_regionkey, return_one()) < 2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectPythonFuncs.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectPythonFuncs.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectPythonFuncs.sql new file mode 100644 index 0000000..bcb9806 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectPythonFuncs.sql @@ -0,0 +1,2 @@ +select helloworld(), sum_py(n_nationkey, n_regionkey) as sum, concat_py(n_name) as concat +from nation where n_nationkey < 5 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectWithPredicateOnPythonFunc.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectWithPredicateOnPythonFunc.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectWithPredicateOnPythonFunc.sql new file mode 100644 index 0000000..d2c5082 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectWithPredicateOnPythonFunc.sql @@ -0,0 +1 @@ +select * from nation where sum_py(n_regionkey,1) > 2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result new file mode 100644 index 0000000..2a5fb8a --- /dev/null +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result @@ -0,0 +1,7 @@ +?count +------------------------------- +4 +5 +5 +5 +4 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result new file mode 100644 index 0000000..08561fd --- /dev/null +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result @@ -0,0 +1,7 @@ +n_regionkey,cnt +------------------------------- +0,5 +3,5 +4,5 +2,5 +1,5 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/results/TestSelectQuery/testNestedPythonFunction.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testNestedPythonFunction.result b/tajo-core/src/test/resources/results/TestSelectQuery/testNestedPythonFunction.result new file mode 100644 index 0000000..899f034 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestSelectQuery/testNestedPythonFunction.result @@ -0,0 +1,7 @@ +n_nationkey,n_name,n_regionkey,n_comment +------------------------------- +0,ALGERIA,0, haggle. carefully final deposits detect slyly agai +5,ETHIOPIA,0,ven packages wake quickly. regu +14,KENYA,0, pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t +15,MOROCCO,0,rns. blithely bold courts among the closely regular packages use furiously bold platelets? +16,MOZAMBIQUE,0,s. ironic, unusual asymptotes wake blithely r \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/results/TestSelectQuery/testSelectPythonFuncs.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testSelectPythonFuncs.result b/tajo-core/src/test/resources/results/TestSelectQuery/testSelectPythonFuncs.result new file mode 100644 index 0000000..877e9fc --- /dev/null +++ b/tajo-core/src/test/resources/results/TestSelectQuery/testSelectPythonFuncs.result @@ -0,0 +1,7 @@ +?helloworld,sum,concat +------------------------------- +Hello, World,0,ALGERIAALGERIA +Hello, World,2,ARGENTINAARGENTINA +Hello, World,3,BRAZILBRAZIL +Hello, World,4,CANADACANADA +Hello, World,8,EGYPTEGYPT \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/test/resources/results/TestSelectQuery/testSelectWithPredicateOnPythonFunc.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testSelectWithPredicateOnPythonFunc.result b/tajo-core/src/test/resources/results/TestSelectQuery/testSelectWithPredicateOnPythonFunc.result new file mode 100644 index 0000000..6d533b4 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestSelectQuery/testSelectWithPredicateOnPythonFunc.result @@ -0,0 +1,17 @@ +n_nationkey,n_name,n_regionkey,n_comment +------------------------------- +4,EGYPT,4,y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d +6,FRANCE,3,refully final requests. regular, ironi +7,GERMANY,3,l platelets. regular accounts x-ray: unusual, regular acco +8,INDIA,2,ss excuses cajole slyly across the packages. deposits print aroun +9,INDONESIA,2, slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull +10,IRAN,4,efully alongside of the slyly final dependencies. +11,IRAQ,4,nic deposits boost atop the quickly final requests? quickly regula +12,JAPAN,2,ously. final, express gifts cajole a +13,JORDAN,4,ic deposits are blithely about the carefully regular pa +18,CHINA,2,c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos +19,ROMANIA,3,ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account +20,SAUDI ARABIA,4,ts. silent requests haggle. closely express packages sleep across the blithely +21,VIETNAM,2,hely enticingly express accounts. even, final +22,RUSSIA,3, requests against the platelets use never according to the quickly regular pint +23,UNITED KINGDOM,3,eans boost carefully special requests. accounts are. carefull \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-docs/src/main/sphinx/functions.rst ---------------------------------------------------------------------- diff --git a/tajo-docs/src/main/sphinx/functions.rst b/tajo-docs/src/main/sphinx/functions.rst index 453edf4..8200bfd 100644 --- a/tajo-docs/src/main/sphinx/functions.rst +++ b/tajo-docs/src/main/sphinx/functions.rst @@ -2,6 +2,12 @@ Functions ****************** +Tajo provides extensive supports for functions. It includes a lot of built-in functions and user-defined functions which is implemented in Python. + +=================== +Built-in Functions +=================== + .. toctree:: :maxdepth: 1 @@ -9,4 +15,66 @@ Functions functions/string_func_and_operators functions/datetime_func_and_operators functions/network_func_and_operators - functions/json_func \ No newline at end of file + functions/json_func + +============================== +Python User-defined Functions +============================== + +----------------------- +Function registration +----------------------- + +To register Python UDFs, you must install script files in all cluster nodes. +After that, you can register your functions by specifying the paths to those script files in ``tajo-site.xml``. Here is an example of the configuration. + +.. code-block:: xml + + <property> + <name>tajo.function.python.code-dir</name> + <value>/path/to/script1.py,/path/to/script2.py</value> + </property> + +Please note that you can specify multiple paths with ``','`` as a delimiter. Each file can contain multiple functions. Here is a typical example of a script file. + +.. code-block:: python + + # /path/to/script1.py + + @outputType('int4') + def return_one(): + return 1 + + @outputType("text") + def helloworld(): + return 'Hello, World' + + # No decorator - blob + def concat_py(str): + return str+str + + @outputType('int4') + def sum_py(a,b): + return a+b + +If the configuration is set properly, every function in the script files are registered when the Tajo cluster starts up. + +----------------------- +Decorators and types +----------------------- + +By default, every function has a return type of ``BLOB``. +You can use Python decorators to define output types for the script functions. Tajo can figure out return types from the annotations of the Python script. + +* ``outputType``: Defines the return data type for a script UDF in a format that Tajo can understand. The defined type must be one of the types supported by Tajo. For supported types, please refer to :doc:`/sql_language/data_model`. + +----------------------- +Query example +----------------------- + +Once the Python UDFs are successfully registered, you can use them as other built-in functions. + +.. code-block:: sql + + default> select concat_py(n_name)::text from nation where sum_py(n_regionkey,1) > 2; + http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-docs/src/main/sphinx/functions/json_func.rst ---------------------------------------------------------------------- diff --git a/tajo-docs/src/main/sphinx/functions/json_func.rst b/tajo-docs/src/main/sphinx/functions/json_func.rst index 5bf5814..d05cc1b 100644 --- a/tajo-docs/src/main/sphinx/functions/json_func.rst +++ b/tajo-docs/src/main/sphinx/functions/json_func.rst @@ -3,6 +3,7 @@ JSON Functions ******************************* .. function:: json_extract_path_text (string json, string xpath) + Extracts JSON string from a JSON string based on json path specified and returns JSON string pointed to by xPath :param string: http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java b/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java index 42a7380..0c5a012 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java @@ -19,6 +19,7 @@ package org.apache.tajo.plan; import com.google.common.collect.Sets; +import org.apache.commons.collections.set.UnmodifiableSet; import org.apache.tajo.OverridableConf; import org.apache.tajo.SessionVars; import org.apache.tajo.algebra.*; @@ -34,7 +35,6 @@ import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.plan.algebra.BaseAlgebraVisitor; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.function.AggFunction; -import org.apache.tajo.plan.function.GeneralFunction; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.nameresolver.NameResolver; import org.apache.tajo.plan.nameresolver.NameResolvingMode; @@ -43,6 +43,7 @@ import org.apache.tajo.util.TUtil; import org.apache.tajo.util.datetime.DateTimeUtil; import org.apache.tajo.util.datetime.TimeMeta; +import java.io.IOException; import java.util.Set; import java.util.Stack; import java.util.TimeZone; @@ -384,7 +385,7 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva if (!EvalTreeUtil.checkIfCanBeConstant(evalNodes[i])) { throw new PlanningException("Non constant values cannot be included in IN PREDICATE."); } - values[i] = EvalTreeUtil.evaluateImmediately(evalNodes[i]); + values[i] = EvalTreeUtil.evaluateImmediately(null, evalNodes[i]); } return new RowConstantEval(values); } @@ -608,7 +609,7 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva FunctionType functionType = funcDesc.getFuncType(); if (functionType == FunctionType.GENERAL || functionType == FunctionType.UDF) { - return new GeneralFunctionEval(ctx.queryContext, funcDesc, (GeneralFunction) funcDesc.newInstance(), givenArgs); + return new GeneralFunctionEval(ctx.queryContext, funcDesc, givenArgs); } else if (functionType == FunctionType.AGGREGATION || functionType == FunctionType.UDA) { if (!ctx.currentBlock.hasNode(NodeType.GROUP_BY)) { @@ -623,6 +624,8 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva } } catch (InternalException e) { throw new PlanningException(e); + } catch (IOException e) { + throw new PlanningException(e); } } @@ -679,7 +682,8 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva } public static final Set<String> WINDOW_FUNCTIONS = - Sets.newHashSet("row_number", "rank", "dense_rank", "percent_rank", "cume_dist", "first_value", "lag"); + UnmodifiableSet.decorate( + Sets.newHashSet("row_number", "rank", "dense_rank", "percent_rank", "cume_dist", "first_value", "lag")); public EvalNode visitWindowFunction(Context ctx, Stack<Expr> stack, WindowFunctionExpr windowFunc) throws PlanningException { @@ -904,9 +908,9 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva public static int [] dateToIntArray(String years, String months, String days) throws PlanningException { - int year = Integer.valueOf(years); - int month = Integer.valueOf(months); - int day = Integer.valueOf(days); + int year = Integer.parseInt(years); + int month = Integer.parseInt(months); + int day = Integer.parseInt(days); if (!(1 <= year && year <= 9999)) { throw new PlanningException(String.format("Years (%d) must be between 1 and 9999 integer value", year)); @@ -930,12 +934,12 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva public static int [] timeToIntArray(String hours, String minutes, String seconds, String fractionOfSecond) throws PlanningException { - int hour = Integer.valueOf(hours); - int minute = Integer.valueOf(minutes); - int second = Integer.valueOf(seconds); + int hour = Integer.parseInt(hours); + int minute = Integer.parseInt(minutes); + int second = Integer.parseInt(seconds); int fraction = 0; if (fractionOfSecond != null) { - fraction = Integer.valueOf(fractionOfSecond); + fraction = Integer.parseInt(fractionOfSecond); } if (!(0 <= hour && hour <= 23)) { http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index d7c631b..d1c1a15 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -115,6 +115,10 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex return queryBlock; } + public OverridableConf getQueryContext() { + return queryContext; + } + public String toString() { return "block=" + queryBlock.getName() + ", relNum=" + queryBlock.getRelations().size() + ", "+ queryBlock.namedExprsMgr.toString(); @@ -132,7 +136,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex } @VisibleForTesting - public LogicalPlan createPlan(OverridableConf queryContext, Expr expr, boolean debug) throws PlanningException { + public LogicalPlan createPlan(OverridableConf queryContext, Expr expr, boolean debug) + throws PlanningException { LogicalPlan plan = new LogicalPlan(this); @@ -810,7 +815,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex limitNode.setInSchema(child.getOutSchema()); limitNode.setOutSchema(child.getOutSchema()); - firstFetNum.bind(null); + firstFetNum.bind(null, null); limitNode.setFetchFirst(firstFetNum.eval(null).asInt8()); return limitNode; http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java index fb05f33..6cf7272 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java @@ -149,7 +149,7 @@ public class AlgebraicUtil { } if (lhs.getType() == EvalType.CONST && rhs.getType() == EvalType.CONST) { - return new ConstEval(binaryEval.bind(null).eval(null)); + return new ConstEval(binaryEval.bind(null, null).eval(null)); } return binaryEval; @@ -162,7 +162,7 @@ public class AlgebraicUtil { stack.pop(); if (child.getType() == EvalType.CONST) { - return new ConstEval(unaryEval.bind(null).eval(null)); + return new ConstEval(unaryEval.bind(null, null).eval(null)); } return unaryEval; @@ -184,7 +184,7 @@ public class AlgebraicUtil { } if (constantOfAllDescendents && evalNode.getType() == EvalType.FUNCTION) { - return new ConstEval(evalNode.bind(null).eval(null)); + return new ConstEval(evalNode.bind(null, null).eval(null)); } else { return evalNode; } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/BetweenPredicateEval.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/BetweenPredicateEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/BetweenPredicateEval.java index 2db4368..eb3e36b 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/BetweenPredicateEval.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/BetweenPredicateEval.java @@ -80,7 +80,7 @@ public class BetweenPredicateEval extends EvalNode implements Cloneable { } private static interface Checker { - void bind(Schema schema); + void bind(EvalContext evalContext, Schema schema); Datum eval(Tuple param); } @@ -103,8 +103,8 @@ public class BetweenPredicateEval extends EvalNode implements Cloneable { } @Override - public void bind(Schema schema) { - predicand.bind(schema); + public void bind(EvalContext evalContext, Schema schema) { + predicand.bind(evalContext, schema); } @Override @@ -134,10 +134,10 @@ public class BetweenPredicateEval extends EvalNode implements Cloneable { } @Override - public void bind(Schema schema) { - predicand.bind(schema); - begin.bind(schema); - end.bind(schema); + public void bind(EvalContext evalContext, Schema schema) { + predicand.bind(evalContext, schema); + begin.bind(evalContext, schema); + end.bind(evalContext, schema); } @Override @@ -170,10 +170,10 @@ public class BetweenPredicateEval extends EvalNode implements Cloneable { } @Override - public void bind(Schema schema) { - predicand.bind(schema); - begin.bind(schema); - end.bind(schema); + public void bind(EvalContext evalContext, Schema schema) { + predicand.bind(evalContext, schema); + begin.bind(evalContext, schema); + end.bind(evalContext, schema); } @Override @@ -227,8 +227,8 @@ public class BetweenPredicateEval extends EvalNode implements Cloneable { } @Override - public EvalNode bind(Schema schema) { - super.bind(schema); + public EvalNode bind(EvalContext evalContext, Schema schema) { + super.bind(evalContext, schema); if (begin.getType() == EvalType.CONST && end.getType() == EvalType.CONST) { Datum beginValue = ((ConstEval)begin).getValue(); Datum endValue = ((ConstEval)end).getValue(); @@ -245,7 +245,7 @@ public class BetweenPredicateEval extends EvalNode implements Cloneable { checker = new AsymmetricChecker(not, predicand, begin, end); } } - checker.bind(schema); + checker.bind(evalContext, schema); return this; } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/CastEval.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/CastEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/CastEval.java index f174b79..b8b768b 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/CastEval.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/CastEval.java @@ -31,7 +31,7 @@ import java.util.TimeZone; import static org.apache.tajo.common.TajoDataTypes.DataType; -public class CastEval extends UnaryEval { +public class CastEval extends UnaryEval implements Cloneable { @Expose private DataType target; @Expose private TimeZone timezone; @@ -97,11 +97,24 @@ public class CastEval extends UnaryEval { boolean valid = obj != null && obj instanceof CastEval; if (valid) { CastEval another = (CastEval) obj; - return child.equals(another.child) && - target.equals(another.target) && - TUtil.checkEquals(timezone, another.timezone); + boolean b1 = child.equals(another.child); + boolean b2 = target.equals(another.target); + boolean b3 = TUtil.checkEquals(timezone, another.timezone); + return b1 && b2 && b3; } else { return false; } } + + @Override + public Object clone() throws CloneNotSupportedException { + CastEval clone = (CastEval) super.clone(); + if (target != null) { + clone.target = target; + } + if (timezone != null) { + clone.timezone = timezone; + } + return clone; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalContext.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalContext.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalContext.java new file mode 100644 index 0000000..6a30e77 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalContext.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.expr; + +import org.apache.tajo.plan.function.python.TajoScriptEngine; +import org.apache.tajo.util.TUtil; + +import java.util.Collection; +import java.util.Map; + +public class EvalContext { + private final Map<EvalNode, TajoScriptEngine> scriptEngineMap = TUtil.newHashMap(); + + public void addScriptEngine(EvalNode evalNode, TajoScriptEngine scriptExecutor) { + this.scriptEngineMap.put(evalNode, scriptExecutor); + } + + public boolean hasScriptEngine(EvalNode evalNode) { + return this.scriptEngineMap.containsKey(evalNode); + } + + public TajoScriptEngine getScriptEngine(EvalNode evalNode) { + return this.scriptEngineMap.get(evalNode); + } + + public Collection<TajoScriptEngine> getAllScriptEngines() { + return this.scriptEngineMap.values(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java index 154b0fd..9abb0bc 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java @@ -19,6 +19,7 @@ package org.apache.tajo.plan.expr; import com.google.gson.annotations.Expose; +import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.common.TajoDataTypes.DataType; @@ -61,9 +62,9 @@ public abstract class EvalNode implements Cloneable, GsonObject, ProtoObject<Pla return PlanGsonHelper.toJson(this, EvalNode.class); } - public EvalNode bind(Schema schema) { + public EvalNode bind(@Nullable EvalContext evalContext, Schema schema) { for (int i = 0; i < childNum(); i++) { - getChild(i).bind(schema); + getChild(i).bind(evalContext, schema); } isBinded = true; return this; http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java index 5e3843c..1fa2fe0 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java @@ -570,8 +570,8 @@ public class EvalTreeUtil { return findUniqueColumns(evalNode).size() == 0 && findDistinctAggFunction(evalNode).size() == 0; } - public static Datum evaluateImmediately(EvalNode evalNode) { - evalNode.bind(null); + public static Datum evaluateImmediately(EvalContext evalContext, EvalNode evalNode) { + evalNode.bind(evalContext, null); return evalNode.eval(null); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FieldEval.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FieldEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FieldEval.java index 83d00b9..870970b 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FieldEval.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FieldEval.java @@ -40,8 +40,8 @@ public class FieldEval extends EvalNode implements Cloneable { } @Override - public EvalNode bind(Schema schema) { - super.bind(schema); + public EvalNode bind(EvalContext evalContext, Schema schema) { + super.bind(evalContext, schema); // TODO - column namespace should be improved to simplify name handling and resolving. if (column.hasQualifier()) { fieldId = schema.getColumnId(column.getQualifiedName()); http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java index 4ff7548..dd9121b 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java @@ -49,8 +49,8 @@ public abstract class FunctionEval extends EvalNode implements Cloneable { } @Override - public EvalNode bind(Schema schema) { - super.bind(schema); + public EvalNode bind(EvalContext evalContext, Schema schema) { + super.bind(evalContext, schema); this.params = new VTuple(argEvals.length); return this; } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/GeneralFunctionEval.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/GeneralFunctionEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/GeneralFunctionEval.java index 2ce7850..30fbe91 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/GeneralFunctionEval.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/GeneralFunctionEval.java @@ -18,54 +18,56 @@ package org.apache.tajo.plan.expr; -import com.google.common.base.Objects; import com.google.gson.annotations.Expose; import org.apache.tajo.OverridableConf; import org.apache.tajo.catalog.FunctionDesc; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.datum.Datum; -import org.apache.tajo.plan.function.GeneralFunction; +import org.apache.tajo.plan.function.FunctionInvoke; +import org.apache.tajo.plan.function.FunctionInvokeContext; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.util.TUtil; -import javax.annotation.Nullable; +import java.io.IOException; public class GeneralFunctionEval extends FunctionEval { - @Expose protected GeneralFunction instance; + protected FunctionInvoke funcInvoke; + @Expose protected FunctionInvokeContext invokeContext; - public GeneralFunctionEval(@Nullable OverridableConf queryContext, FunctionDesc desc, GeneralFunction instance, - EvalNode[] givenArgs) { + public GeneralFunctionEval(OverridableConf queryContext, FunctionDesc desc, EvalNode[] givenArgs) + throws IOException { super(EvalType.FUNCTION, desc, givenArgs); - this.instance = instance; - this.instance.init(queryContext, getParamType()); + this.invokeContext = new FunctionInvokeContext(queryContext, getParamType()); + } + + @Override + public EvalNode bind(EvalContext evalContext, Schema schema) { + super.bind(evalContext, schema); + try { + this.funcInvoke = FunctionInvoke.newInstance(funcDesc); + if (evalContext != null && evalContext.hasScriptEngine(this)) { + this.invokeContext.setScriptEngine(evalContext.getScriptEngine(this)); + } + this.funcInvoke.init(invokeContext); + } catch (IOException e) { + throw new RuntimeException(e); + } + return this; } @Override @SuppressWarnings("unchecked") public Datum eval(Tuple tuple) { super.eval(tuple); - return instance.eval(evalParams(tuple)); + Datum res = funcInvoke.eval(evalParams(tuple)); + return res; } @Override - public boolean equals(Object obj) { - if (obj instanceof GeneralFunctionEval) { - GeneralFunctionEval other = (GeneralFunctionEval) obj; - return super.equals(other) && - TUtil.checkEquals(instance, other.instance); - } - - return false; - } - - @Override - public int hashCode() { - return Objects.hashCode(funcDesc, instance); - } - - @Override public Object clone() throws CloneNotSupportedException { GeneralFunctionEval eval = (GeneralFunctionEval) super.clone(); - eval.instance = (GeneralFunction) instance.clone(); + if (funcInvoke != null) { + eval.funcInvoke = (FunctionInvoke) funcInvoke.clone(); + } return eval; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java index 6faa667..cdd8dfb 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/PatternMatchPredicateEval.java @@ -74,8 +74,8 @@ public abstract class PatternMatchPredicateEval extends BinaryEval { } @Override - public EvalNode bind(Schema schema) { - super.bind(schema); + public EvalNode bind(EvalContext evalContext, Schema schema) { + super.bind(evalContext, schema); compile(pattern); return this; } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizationRule.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizationRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizationRule.java index 7e03224..2440c52 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizationRule.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizationRule.java @@ -20,9 +20,10 @@ package org.apache.tajo.plan.exprrewrite; import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.annotator.Prioritized; +import org.apache.tajo.plan.expr.EvalContext; import org.apache.tajo.plan.expr.EvalNode; @Prioritized public interface EvalTreeOptimizationRule { - public EvalNode optimize(LogicalPlanner.PlanContext context, EvalNode tree); + EvalNode optimize(LogicalPlanner.PlanContext context, EvalNode tree); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java index a8a3ff3..680600a 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/EvalTreeOptimizer.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.annotator.Prioritized; +import org.apache.tajo.plan.expr.EvalContext; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.util.ClassUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java index d77f072..a8b0945 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java @@ -18,17 +18,23 @@ package org.apache.tajo.plan.exprrewrite.rules; +import org.apache.tajo.datum.Datum; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.exprrewrite.EvalTreeOptimizationRule; import org.apache.tajo.plan.annotator.Prioritized; import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.function.python.PythonScriptEngine; +import org.apache.tajo.plan.function.python.TajoScriptEngine; +import java.io.IOException; import java.util.Stack; @Prioritized(priority = 10) public class ConstantFolding extends SimpleEvalNodeVisitor<LogicalPlanner.PlanContext> implements EvalTreeOptimizationRule { + private final static String SLEEP_FUNCTION_NAME = "sleep"; + @Override public EvalNode optimize(LogicalPlanner.PlanContext context, EvalNode evalNode) { return visit(context, evalNode, new Stack<EvalNode>()); @@ -49,7 +55,7 @@ public class ConstantFolding extends SimpleEvalNodeVisitor<LogicalPlanner.PlanCo } if (lhs.getType() == EvalType.CONST && rhs.getType() == EvalType.CONST) { - binaryEval.bind(null); + binaryEval.bind(null, null); return new ConstEval(binaryEval.eval(null)); } @@ -62,8 +68,9 @@ public class ConstantFolding extends SimpleEvalNodeVisitor<LogicalPlanner.PlanCo EvalNode child = visit(context, unaryEval.getChild(), stack); stack.pop(); + unaryEval.setChild(child); if (child.getType() == EvalType.CONST) { - unaryEval.bind(null); + unaryEval.bind(null, null); return new ConstEval(unaryEval.eval(null)); } @@ -74,7 +81,7 @@ public class ConstantFolding extends SimpleEvalNodeVisitor<LogicalPlanner.PlanCo public EvalNode visitFuncCall(LogicalPlanner.PlanContext context, FunctionEval evalNode, Stack<EvalNode> stack) { boolean constantOfAllDescendents = true; - if ("sleep".equals(evalNode.getFuncDesc().getFunctionName())) { + if (SLEEP_FUNCTION_NAME.equals(evalNode.getFuncDesc().getFunctionName())) { constantOfAllDescendents = false; } else { for (EvalNode arg : evalNode.getArgs()) { @@ -84,8 +91,24 @@ public class ConstantFolding extends SimpleEvalNodeVisitor<LogicalPlanner.PlanCo } if (constantOfAllDescendents && evalNode.getType() == EvalType.FUNCTION) { - evalNode.bind(null); - return new ConstEval(evalNode.eval(null)); + if (evalNode.getFuncDesc().getInvocation().hasPython()) { + TajoScriptEngine executor = new PythonScriptEngine(evalNode.getFuncDesc()); + try { + executor.start(context.getQueryContext().getConf()); + EvalContext evalContext = new EvalContext(); + evalContext.addScriptEngine(evalNode, executor); + evalNode.bind(evalContext, null); + Datum funcRes = evalNode.eval(null); + executor.shutdown(); + return new ConstEval(funcRes); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + evalNode.bind(null, null); + return new ConstEval(evalNode.eval(null)); + } + } else { return evalNode; }
