Repository: tajo Updated Branches: refs/heads/master 2d2f192d7 -> e296a0d1f
http://git-wip-us.apache.org/repos/asf/tajo/blob/e296a0d1/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 74c0e5a..02dff19 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -75,28 +75,35 @@ public class PythonScriptEngine extends TajoScriptEngine { } finally { in.close(); } + for(FunctionInfo funcInfo : functions) { - FunctionSignature signature; - FunctionInvocation invocation = new FunctionInvocation(); - FunctionSupplement supplement = new FunctionSupplement(); - if (funcInfo instanceof ScalarFuncInfo) { - ScalarFuncInfo scalarFuncInfo = (ScalarFuncInfo) funcInfo; - TajoDataTypes.DataType returnType = getReturnTypes(scalarFuncInfo)[0]; - signature = new FunctionSignature(CatalogProtos.FunctionType.UDF, scalarFuncInfo.funcName, - returnType, createParamTypes(scalarFuncInfo.paramNum)); - PythonInvocationDesc invocationDesc = new PythonInvocationDesc(scalarFuncInfo.funcName, path.getPath(), true); - invocation.setPython(invocationDesc); - functionDescs.add(new FunctionDesc(signature, invocation, supplement)); - } else { - AggFuncInfo aggFuncInfo = (AggFuncInfo) funcInfo; - if (isValidUdaf(aggFuncInfo)) { - TajoDataTypes.DataType returnType = getReturnTypes(aggFuncInfo.getFinalResultInfo)[0]; - signature = new FunctionSignature(CatalogProtos.FunctionType.UDA, aggFuncInfo.funcName, - returnType, createParamTypes(aggFuncInfo.evalInfo.paramNum)); - PythonInvocationDesc invocationDesc = new PythonInvocationDesc(aggFuncInfo.className, path.getPath(), false); - invocation.setPythonAggregation(invocationDesc); + try { + FunctionSignature signature; + FunctionInvocation invocation = new FunctionInvocation(); + FunctionSupplement supplement = new FunctionSupplement(); + if (funcInfo instanceof ScalarFuncInfo) { + ScalarFuncInfo scalarFuncInfo = (ScalarFuncInfo) funcInfo; + TajoDataTypes.DataType returnType = getReturnTypes(scalarFuncInfo)[0]; + signature = new FunctionSignature(CatalogProtos.FunctionType.UDF, scalarFuncInfo.funcName, + returnType, createParamTypes(scalarFuncInfo.paramNum)); + PythonInvocationDesc invocationDesc = new PythonInvocationDesc(scalarFuncInfo.funcName, path.getPath(), true); + invocation.setPython(invocationDesc); functionDescs.add(new FunctionDesc(signature, invocation, supplement)); + } else { + AggFuncInfo aggFuncInfo = (AggFuncInfo) funcInfo; + if (isValidUdaf(aggFuncInfo)) { + TajoDataTypes.DataType returnType = getReturnTypes(aggFuncInfo.getFinalResultInfo)[0]; + signature = new FunctionSignature(CatalogProtos.FunctionType.UDA, aggFuncInfo.funcName, + returnType, createParamTypes(aggFuncInfo.evalInfo.paramNum)); + PythonInvocationDesc invocationDesc = new PythonInvocationDesc(aggFuncInfo.className, path.getPath(), false); + + invocation.setPythonAggregation(invocationDesc); + functionDescs.add(new FunctionDesc(signature, invocation, supplement)); + } } + } catch (Throwable t) { + // ignore invalid functions + LOG.warn(t); } } return functionDescs; @@ -168,59 +175,65 @@ public class PythonScriptEngine extends TajoScriptEngine { String line = br.readLine(); String[] quotedSchemaStrings = null; AggFuncInfo aggFuncInfo = null; + while (line != null) { - if (pSchema.matcher(line).matches()) { - int start = line.indexOf("(") + 1; //drop brackets - int end = line.lastIndexOf(")"); - quotedSchemaStrings = line.substring(start,end).trim().split(","); - } else if (pDef.matcher(line).matches()) { - boolean isUdaf = aggFuncInfo != null && line.indexOf("def ") > 0; - int nameStart = line.indexOf("def ") + "def ".length(); - int nameEnd = line.indexOf('('); - int signatureEnd = line.indexOf(')'); - String[] params = line.substring(nameEnd+1, signatureEnd).split(","); - int paramNum; - if (params.length == 1) { - paramNum = params[0].equals("") ? 0 : 1; - } else { - paramNum = params.length; - } - if (params[0].trim().equals("self")) { - paramNum--; - } + try { + if (pSchema.matcher(line).matches()) { + int start = line.indexOf("(") + 1; //drop brackets + int end = line.lastIndexOf(")"); + quotedSchemaStrings = line.substring(start, end).trim().split(","); + } else if (pDef.matcher(line).matches()) { + boolean isUdaf = aggFuncInfo != null && line.indexOf("def ") > 0; + int nameStart = line.indexOf("def ") + "def ".length(); + int nameEnd = line.indexOf('('); + int signatureEnd = line.indexOf(')'); + String[] params = line.substring(nameEnd + 1, signatureEnd).split(","); + int paramNum; + if (params.length == 1) { + paramNum = params[0].equals("") ? 0 : 1; + } else { + paramNum = params.length; + } + if (params[0].trim().equals("self")) { + paramNum--; + } - String functionName = line.substring(nameStart, nameEnd).trim(); - quotedSchemaStrings = quotedSchemaStrings == null ? new String[] {"'blob'"} : quotedSchemaStrings; - - if (isUdaf) { - if (functionName.equals("eval")) { - aggFuncInfo.evalInfo = new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum); - } else if (functionName.equals("merge")) { - aggFuncInfo.mergeInfo = new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum); - } else if (functionName.equals("get_partial_result")) { - aggFuncInfo.getPartialResultInfo = new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum); - } else if (functionName.equals("get_final_result")) { - aggFuncInfo.getFinalResultInfo = new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum); + String functionName = line.substring(nameStart, nameEnd).trim(); + quotedSchemaStrings = quotedSchemaStrings == null ? new String[]{"'blob'"} : quotedSchemaStrings; + + if (isUdaf) { + if (functionName.equals("eval")) { + aggFuncInfo.evalInfo = new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum); + } else if (functionName.equals("merge")) { + aggFuncInfo.mergeInfo = new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum); + } else if (functionName.equals("get_partial_result")) { + aggFuncInfo.getPartialResultInfo = new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum); + } else if (functionName.equals("get_final_result")) { + aggFuncInfo.getFinalResultInfo = new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum); + } + } else { + aggFuncInfo = null; + functions.add(new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum)); } - } else { - aggFuncInfo = null; - functions.add(new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum)); - } - quotedSchemaStrings = null; - } else if (pClass.matcher(line).matches()) { - // UDAF - if (aggFuncInfo != null) { - functions.add(aggFuncInfo); - } - aggFuncInfo = new AggFuncInfo(); - int classNameStart = line.indexOf("class ") + "class ".length(); - int classNameEnd = line.indexOf("("); - if (classNameEnd < 0) { - classNameEnd = line.indexOf(":"); + quotedSchemaStrings = null; + } else if (pClass.matcher(line).matches()) { + // UDAF + if (aggFuncInfo != null) { + functions.add(aggFuncInfo); + } + aggFuncInfo = new AggFuncInfo(); + int classNameStart = line.indexOf("class ") + "class ".length(); + int classNameEnd = line.indexOf("("); + if (classNameEnd < 0) { + classNameEnd = line.indexOf(":"); + } + aggFuncInfo.className = line.substring(classNameStart, classNameEnd).trim(); + aggFuncInfo.funcName = aggFuncInfo.className.toLowerCase(); } - aggFuncInfo.className = line.substring(classNameStart, classNameEnd).trim(); - aggFuncInfo.funcName = aggFuncInfo.className.toLowerCase(); + } catch (Throwable t) { + // ignore unexpected function and source lines + LOG.warn(t); } line = br.readLine(); }
