Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.0 5ef0edb0d -> b2a972220


http://git-wip-us.apache.org/repos/asf/tajo/blob/b2a97222/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
index 74c0e5a..02dff19 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
@@ -75,28 +75,35 @@ public class PythonScriptEngine extends TajoScriptEngine {
     } finally {
       in.close();
     }
+
     for(FunctionInfo funcInfo : functions) {
-      FunctionSignature signature;
-      FunctionInvocation invocation = new FunctionInvocation();
-      FunctionSupplement supplement = new FunctionSupplement();
-      if (funcInfo instanceof ScalarFuncInfo) {
-        ScalarFuncInfo scalarFuncInfo = (ScalarFuncInfo) funcInfo;
-        TajoDataTypes.DataType returnType = getReturnTypes(scalarFuncInfo)[0];
-        signature = new FunctionSignature(CatalogProtos.FunctionType.UDF, 
scalarFuncInfo.funcName,
-            returnType, createParamTypes(scalarFuncInfo.paramNum));
-        PythonInvocationDesc invocationDesc = new 
PythonInvocationDesc(scalarFuncInfo.funcName, path.getPath(), true);
-        invocation.setPython(invocationDesc);
-        functionDescs.add(new FunctionDesc(signature, invocation, supplement));
-      } else {
-        AggFuncInfo aggFuncInfo = (AggFuncInfo) funcInfo;
-        if (isValidUdaf(aggFuncInfo)) {
-          TajoDataTypes.DataType returnType = 
getReturnTypes(aggFuncInfo.getFinalResultInfo)[0];
-          signature = new FunctionSignature(CatalogProtos.FunctionType.UDA, 
aggFuncInfo.funcName,
-              returnType, createParamTypes(aggFuncInfo.evalInfo.paramNum));
-          PythonInvocationDesc invocationDesc = new 
PythonInvocationDesc(aggFuncInfo.className, path.getPath(), false);
-          invocation.setPythonAggregation(invocationDesc);
+      try {
+        FunctionSignature signature;
+        FunctionInvocation invocation = new FunctionInvocation();
+        FunctionSupplement supplement = new FunctionSupplement();
+        if (funcInfo instanceof ScalarFuncInfo) {
+          ScalarFuncInfo scalarFuncInfo = (ScalarFuncInfo) funcInfo;
+          TajoDataTypes.DataType returnType = 
getReturnTypes(scalarFuncInfo)[0];
+          signature = new FunctionSignature(CatalogProtos.FunctionType.UDF, 
scalarFuncInfo.funcName,
+              returnType, createParamTypes(scalarFuncInfo.paramNum));
+          PythonInvocationDesc invocationDesc = new 
PythonInvocationDesc(scalarFuncInfo.funcName, path.getPath(), true);
+          invocation.setPython(invocationDesc);
           functionDescs.add(new FunctionDesc(signature, invocation, 
supplement));
+        } else {
+          AggFuncInfo aggFuncInfo = (AggFuncInfo) funcInfo;
+          if (isValidUdaf(aggFuncInfo)) {
+            TajoDataTypes.DataType returnType = 
getReturnTypes(aggFuncInfo.getFinalResultInfo)[0];
+            signature = new FunctionSignature(CatalogProtos.FunctionType.UDA, 
aggFuncInfo.funcName,
+                returnType, createParamTypes(aggFuncInfo.evalInfo.paramNum));
+            PythonInvocationDesc invocationDesc = new 
PythonInvocationDesc(aggFuncInfo.className, path.getPath(), false);
+
+            invocation.setPythonAggregation(invocationDesc);
+            functionDescs.add(new FunctionDesc(signature, invocation, 
supplement));
+          }
         }
+      } catch (Throwable t) {
+        // ignore invalid functions
+        LOG.warn(t);
       }
     }
     return functionDescs;
@@ -168,59 +175,65 @@ public class PythonScriptEngine extends TajoScriptEngine {
     String line = br.readLine();
     String[] quotedSchemaStrings = null;
     AggFuncInfo aggFuncInfo = null;
+
     while (line != null) {
-      if (pSchema.matcher(line).matches()) {
-        int start = line.indexOf("(") + 1; //drop brackets
-        int end = line.lastIndexOf(")");
-        quotedSchemaStrings = line.substring(start,end).trim().split(",");
-      } else if (pDef.matcher(line).matches()) {
-        boolean isUdaf = aggFuncInfo != null && line.indexOf("def ") > 0;
-        int nameStart = line.indexOf("def ") + "def ".length();
-        int nameEnd = line.indexOf('(');
-        int signatureEnd = line.indexOf(')');
-        String[] params = line.substring(nameEnd+1, signatureEnd).split(",");
-        int paramNum;
-        if (params.length == 1) {
-          paramNum = params[0].equals("") ? 0 : 1;
-        } else {
-          paramNum = params.length;
-        }
-        if (params[0].trim().equals("self")) {
-          paramNum--;
-        }
+      try {
+        if (pSchema.matcher(line).matches()) {
+          int start = line.indexOf("(") + 1; //drop brackets
+          int end = line.lastIndexOf(")");
+          quotedSchemaStrings = line.substring(start, end).trim().split(",");
+        } else if (pDef.matcher(line).matches()) {
+          boolean isUdaf = aggFuncInfo != null && line.indexOf("def ") > 0;
+          int nameStart = line.indexOf("def ") + "def ".length();
+          int nameEnd = line.indexOf('(');
+          int signatureEnd = line.indexOf(')');
+          String[] params = line.substring(nameEnd + 1, 
signatureEnd).split(",");
+          int paramNum;
+          if (params.length == 1) {
+            paramNum = params[0].equals("") ? 0 : 1;
+          } else {
+            paramNum = params.length;
+          }
+          if (params[0].trim().equals("self")) {
+            paramNum--;
+          }
 
-        String functionName = line.substring(nameStart, nameEnd).trim();
-        quotedSchemaStrings = quotedSchemaStrings == null ? new String[] 
{"'blob'"} : quotedSchemaStrings;
-
-        if (isUdaf) {
-          if (functionName.equals("eval")) {
-            aggFuncInfo.evalInfo = new ScalarFuncInfo(quotedSchemaStrings, 
functionName, paramNum);
-          } else if (functionName.equals("merge")) {
-            aggFuncInfo.mergeInfo = new ScalarFuncInfo(quotedSchemaStrings, 
functionName, paramNum);
-          } else if (functionName.equals("get_partial_result")) {
-            aggFuncInfo.getPartialResultInfo = new 
ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum);
-          } else if (functionName.equals("get_final_result")) {
-            aggFuncInfo.getFinalResultInfo = new 
ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum);
+          String functionName = line.substring(nameStart, nameEnd).trim();
+          quotedSchemaStrings = quotedSchemaStrings == null ? new 
String[]{"'blob'"} : quotedSchemaStrings;
+
+          if (isUdaf) {
+            if (functionName.equals("eval")) {
+              aggFuncInfo.evalInfo = new ScalarFuncInfo(quotedSchemaStrings, 
functionName, paramNum);
+            } else if (functionName.equals("merge")) {
+              aggFuncInfo.mergeInfo = new ScalarFuncInfo(quotedSchemaStrings, 
functionName, paramNum);
+            } else if (functionName.equals("get_partial_result")) {
+              aggFuncInfo.getPartialResultInfo = new 
ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum);
+            } else if (functionName.equals("get_final_result")) {
+              aggFuncInfo.getFinalResultInfo = new 
ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum);
+            }
+          } else {
+            aggFuncInfo = null;
+            functions.add(new ScalarFuncInfo(quotedSchemaStrings, 
functionName, paramNum));
           }
-        } else {
-          aggFuncInfo = null;
-          functions.add(new ScalarFuncInfo(quotedSchemaStrings, functionName, 
paramNum));
-        }
 
-        quotedSchemaStrings = null;
-      } else if (pClass.matcher(line).matches()) {
-        // UDAF
-        if (aggFuncInfo != null) {
-          functions.add(aggFuncInfo);
-        }
-        aggFuncInfo = new AggFuncInfo();
-        int classNameStart = line.indexOf("class ") + "class ".length();
-        int classNameEnd = line.indexOf("(");
-        if (classNameEnd < 0) {
-          classNameEnd = line.indexOf(":");
+          quotedSchemaStrings = null;
+        } else if (pClass.matcher(line).matches()) {
+          // UDAF
+          if (aggFuncInfo != null) {
+            functions.add(aggFuncInfo);
+          }
+          aggFuncInfo = new AggFuncInfo();
+          int classNameStart = line.indexOf("class ") + "class ".length();
+          int classNameEnd = line.indexOf("(");
+          if (classNameEnd < 0) {
+            classNameEnd = line.indexOf(":");
+          }
+          aggFuncInfo.className = line.substring(classNameStart, 
classNameEnd).trim();
+          aggFuncInfo.funcName = aggFuncInfo.className.toLowerCase();
         }
-        aggFuncInfo.className = line.substring(classNameStart, 
classNameEnd).trim();
-        aggFuncInfo.funcName = aggFuncInfo.className.toLowerCase();
+      } catch (Throwable t) {
+        // ignore unexpected function and source lines
+        LOG.warn(t);
       }
       line = br.readLine();
     }

Reply via email to