Repository: tajo
Updated Branches:
  refs/heads/index_support e6b91c333 -> 86c97b2a1


http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java
deleted file mode 100644
index 6b2c116..0000000
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.plan.function;
-
-import com.google.gson.annotations.Expose;
-import org.apache.tajo.catalog.FunctionDesc;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.exception.InternalException;
-import org.apache.tajo.plan.expr.EvalContext;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.util.TUtil;
-
-/**
- * This class invokes the legacy scala functions.
- */
-public class LegacyScalarFunctionInvoke extends FunctionInvoke implements 
Cloneable {
-  @Expose private GeneralFunction function;
-
-  public LegacyScalarFunctionInvoke() {
-
-  }
-
-  public LegacyScalarFunctionInvoke(FunctionDesc funcDesc) throws 
InternalException {
-    super(funcDesc);
-    function = (GeneralFunction) funcDesc.newInstance();
-  }
-
-  @Override
-  public void setFunctionDesc(FunctionDesc desc) throws InternalException {
-    super.setFunctionDesc(desc);
-    function = (GeneralFunction) functionDesc.newInstance();
-  }
-
-  @Override
-  public void init(FunctionInvokeContext context) {
-    function.init(context.getQueryContext(), context.getParamTypes());
-  }
-
-  @Override
-  public Datum eval(Tuple tuple) {
-    return function.eval(tuple);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof LegacyScalarFunctionInvoke) {
-      LegacyScalarFunctionInvoke other = (LegacyScalarFunctionInvoke) o;
-      return super.equals(other) &&
-          TUtil.checkEquals(function, other.function);
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return function.hashCode();
-  }
-
-  @Override
-  public Object clone() throws CloneNotSupportedException {
-    LegacyScalarFunctionInvoke clone = (LegacyScalarFunctionInvoke) 
super.clone();
-    clone.function = (GeneralFunction) function.clone();
-    return clone;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonAggFunctionInvoke.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonAggFunctionInvoke.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonAggFunctionInvoke.java
new file mode 100644
index 0000000..c7fddf9
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonAggFunctionInvoke.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.plan.function.python.PythonScriptEngine;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public class PythonAggFunctionInvoke extends AggFunctionInvoke implements 
Cloneable {
+
+  private transient PythonScriptEngine scriptEngine;
+  private transient PythonAggFunctionContext prevContext;
+  private static int nextContextId = 0;
+
+  /**
+   * Aggregated result should be kept in Tajo task rather than Python UDAF to 
control memory usage.
+   * {@link PythonAggFunctionContext} is to support executing aggregation with 
keys.
+   * It stores a snapshot of Python UDAF class instance as a json string.
+   *
+   * For each UDAF call with different aggregation key,
+   * {@link PythonAggFunctionInvoke} calls {@link 
PythonAggFunctionInvoke#updateContextIfNecessary} to backup and restore
+   * intermediate aggregation states for the previous key and the current key, 
respectively.
+   */
+  public static class PythonAggFunctionContext implements FunctionContext {
+    final int id; // id to identify each context
+    String jsonData; // snapshot of Python class
+
+    public PythonAggFunctionContext() {
+      this.id = nextContextId++;
+    }
+
+    public void setJsonData(String jsonData) {
+      this.jsonData = jsonData;
+    }
+
+    public String getJsonData() {
+      return jsonData;
+    }
+  }
+
+  public PythonAggFunctionInvoke(FunctionDesc functionDesc) {
+    super(functionDesc);
+  }
+
+  @Override
+  public void init(FunctionInvokeContext context) throws IOException {
+    this.scriptEngine = (PythonScriptEngine) context.getScriptEngine();
+  }
+
+  @Override
+  public FunctionContext newContext() {
+    return new PythonAggFunctionContext();
+  }
+
+  /**
+   * Context does not need to be updated per every UDAF call.
+   * If the current aggregation key is same with the previous one,
+   * python-side context doesn't need to be updated because it already 
contains necessary intermediate result.
+   *
+   * @param context
+   */
+  private void updateContextIfNecessary(FunctionContext context) {
+    PythonAggFunctionContext givenContext = (PythonAggFunctionContext) context;
+    if (prevContext == null || prevContext.id != givenContext.id) {
+      try {
+        if (prevContext != null) {
+          scriptEngine.updateJavaSideContext(prevContext);
+        }
+        scriptEngine.updatePythonSideContext(givenContext);
+        prevContext = givenContext;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  public void eval(FunctionContext context, Tuple params) {
+    updateContextIfNecessary(context);
+    scriptEngine.callAggFunc(context, params);
+  }
+
+  @Override
+  public void merge(FunctionContext context, Tuple params) {
+    if (params.get(0) instanceof NullDatum) {
+      return;
+    }
+
+    updateContextIfNecessary(context);
+    scriptEngine.callAggFunc(context, params);
+  }
+
+  @Override
+  public Datum getPartialResult(FunctionContext context) {
+    updateContextIfNecessary(context);
+    // partial results are stored as json strings.
+    return DatumFactory.createText(scriptEngine.getPartialResult(context));
+  }
+
+  @Override
+  public TajoDataTypes.DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(TajoDataTypes.Type.TEXT);
+  }
+
+  @Override
+  public Datum terminate(FunctionContext context) {
+    updateContextIfNecessary(context);
+    return scriptEngine.getFinalResult(context);
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    // nothing to do
+    return super.clone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java
index 1019c60..342bd98 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java
@@ -47,7 +47,7 @@ public class PythonFunctionInvoke extends FunctionInvoke 
implements Cloneable {
 
   @Override
   public Datum eval(Tuple tuple) {
-    Datum res = scriptEngine.eval(tuple);
+    Datum res = scriptEngine.callScalarFunc(tuple);
     return res;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
index 0da30f1..878553f 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
@@ -27,10 +27,9 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.function.FunctionInvocation;
-import org.apache.tajo.function.FunctionSignature;
-import org.apache.tajo.function.FunctionSupplement;
-import org.apache.tajo.function.PythonInvocationDesc;
+import org.apache.tajo.function.*;
+import org.apache.tajo.plan.function.FunctionContext;
+import 
org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext;
 import org.apache.tajo.plan.function.stream.*;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
@@ -70,25 +69,47 @@ public class PythonScriptEngine extends TajoScriptEngine {
     Set<FunctionDesc> functionDescs = TUtil.newHashSet();
 
     InputStream in = getScriptAsStream(path);
-    List<FuncInfo> functions = null;
+    List<FunctionInfo> functions = null;
     try {
       functions = getFunctions(in);
     } finally {
       in.close();
     }
-    for(FuncInfo funcInfo : functions) {
-      TajoDataTypes.DataType returnType = 
CatalogUtil.newSimpleDataType(TajoDataTypes.Type.valueOf(funcInfo.returnType));
-      FunctionSignature signature = new 
FunctionSignature(CatalogProtos.FunctionType.UDF, funcInfo.funcName,
-          returnType, createParamTypes(funcInfo.paramNum));
+    for(FunctionInfo funcInfo : functions) {
+      FunctionSignature signature;
       FunctionInvocation invocation = new FunctionInvocation();
-      PythonInvocationDesc invocationDesc = new 
PythonInvocationDesc(funcInfo.funcName, path.getPath());
-      invocation.setPython(invocationDesc);
       FunctionSupplement supplement = new FunctionSupplement();
-      functionDescs.add(new FunctionDesc(signature, invocation, supplement));
+      if (funcInfo instanceof ScalarFuncInfo) {
+        ScalarFuncInfo scalarFuncInfo = (ScalarFuncInfo) funcInfo;
+        TajoDataTypes.DataType returnType = getReturnTypes(scalarFuncInfo)[0];
+        signature = new FunctionSignature(CatalogProtos.FunctionType.UDF, 
scalarFuncInfo.funcName,
+            returnType, createParamTypes(scalarFuncInfo.paramNum));
+        PythonInvocationDesc invocationDesc = new 
PythonInvocationDesc(scalarFuncInfo.funcName, path.getPath(), true);
+        invocation.setPython(invocationDesc);
+        functionDescs.add(new FunctionDesc(signature, invocation, supplement));
+      } else {
+        AggFuncInfo aggFuncInfo = (AggFuncInfo) funcInfo;
+        if (isValidUdaf(aggFuncInfo)) {
+          TajoDataTypes.DataType returnType = 
getReturnTypes(aggFuncInfo.getFinalResultInfo)[0];
+          signature = new FunctionSignature(CatalogProtos.FunctionType.UDA, 
aggFuncInfo.funcName,
+              returnType, createParamTypes(aggFuncInfo.evalInfo.paramNum));
+          PythonInvocationDesc invocationDesc = new 
PythonInvocationDesc(aggFuncInfo.className, path.getPath(), false);
+          invocation.setPythonAggregation(invocationDesc);
+          functionDescs.add(new FunctionDesc(signature, invocation, 
supplement));
+        }
+      }
     }
     return functionDescs;
   }
 
+  private static boolean isValidUdaf(AggFuncInfo aggFuncInfo) {
+    if (aggFuncInfo.className != null && aggFuncInfo.evalInfo != null && 
aggFuncInfo.mergeInfo != null
+      && aggFuncInfo.getPartialResultInfo != null && 
aggFuncInfo.getFinalResultInfo != null) {
+      return true;
+    }
+    return false;
+  }
+
   private static TajoDataTypes.DataType[] createParamTypes(int paramNum) {
     TajoDataTypes.DataType[] paramTypes = new TajoDataTypes.DataType[paramNum];
     for (int i = 0; i < paramNum; i++) {
@@ -97,34 +118,63 @@ public class PythonScriptEngine extends TajoScriptEngine {
     return paramTypes;
   }
 
-  private static final Pattern pSchema = 
Pattern.compile("^\\s*\\W+outputType.*");
+  private static TajoDataTypes.DataType[] getReturnTypes(ScalarFuncInfo 
scalarFuncInfo) {
+    TajoDataTypes.DataType[] returnTypes = new 
TajoDataTypes.DataType[scalarFuncInfo.returnTypes.length];
+    for (int i = 0; i < scalarFuncInfo.returnTypes.length; i++) {
+      returnTypes[i] = 
CatalogUtil.newSimpleDataType(TajoDataTypes.Type.valueOf(scalarFuncInfo.returnTypes[i]));
+    }
+    return returnTypes;
+  }
+
+  private static final Pattern pSchema = 
Pattern.compile("^\\s*\\W+output_type.*");
   private static final Pattern pDef = 
Pattern.compile("^\\s*def\\s+(\\w+)\\s*.+");
+  private static final Pattern pClass = Pattern.compile("class.*");
 
-  private static class FuncInfo {
-    String returnType;
+  private interface FunctionInfo {
+
+  }
+
+  private static class AggFuncInfo implements FunctionInfo {
+    String className;
+    String funcName;
+    ScalarFuncInfo evalInfo;
+    ScalarFuncInfo mergeInfo;
+    ScalarFuncInfo getPartialResultInfo;
+    ScalarFuncInfo getFinalResultInfo;
+  }
+
+  private static class ScalarFuncInfo implements FunctionInfo {
+    String[] returnTypes;
     String funcName;
     int paramNum;
 
-    public FuncInfo(String returnType, String funcName, int paramNum) {
-      this.returnType = returnType.toUpperCase();
+    public ScalarFuncInfo(String[] quotedSchemaStrings, String funcName, int 
paramNum) {
+      this.returnTypes = new String[quotedSchemaStrings.length];
+      for (int i = 0; i < quotedSchemaStrings.length; i++) {
+        String quotedString = quotedSchemaStrings[i].trim();
+        String[] tokens = quotedString.substring(1, 
quotedString.length()-1).split(":");
+        this.returnTypes[i] = tokens.length == 1 ? tokens[0].toUpperCase() : 
tokens[1].toUpperCase();
+      }
       this.funcName = funcName;
       this.paramNum = paramNum;
     }
   }
 
   // TODO: python parser must be improved.
-  private static List<FuncInfo> getFunctions(InputStream is) throws 
IOException {
-    List<FuncInfo> functions = TUtil.newList();
+  private static List<FunctionInfo> getFunctions(InputStream is) throws 
IOException {
+    List<FunctionInfo> functions = TUtil.newList();
     InputStreamReader in = new InputStreamReader(is, Charset.defaultCharset());
     BufferedReader br = new BufferedReader(in);
     String line = br.readLine();
-    String schemaString = null;
+    String[] quotedSchemaStrings = null;
+    AggFuncInfo aggFuncInfo = null;
     while (line != null) {
       if (pSchema.matcher(line).matches()) {
-        int start = line.indexOf("(") + 2; //drop brackets/quotes
-        int end = line.lastIndexOf(")") - 1;
-        schemaString = line.substring(start,end).trim();
+        int start = line.indexOf("(") + 1; //drop brackets
+        int end = line.lastIndexOf(")");
+        quotedSchemaStrings = line.substring(start,end).trim().split(",");
       } else if (pDef.matcher(line).matches()) {
+        boolean isUdaf = aggFuncInfo != null && line.indexOf("def ") > 0;
         int nameStart = line.indexOf("def ") + "def ".length();
         int nameEnd = line.indexOf('(');
         int signatureEnd = line.indexOf(')');
@@ -135,14 +185,48 @@ public class PythonScriptEngine extends TajoScriptEngine {
         } else {
           paramNum = params.length;
         }
+        if (params[0].trim().equals("self")) {
+          paramNum--;
+        }
 
         String functionName = line.substring(nameStart, nameEnd).trim();
-        schemaString = schemaString == null ? "blob" : schemaString;
-        functions.add(new FuncInfo(schemaString, functionName, paramNum));
-        schemaString = null;
+        quotedSchemaStrings = quotedSchemaStrings == null ? new String[] 
{"'blob'"} : quotedSchemaStrings;
+
+        if (isUdaf) {
+          if (functionName.equals("eval")) {
+            aggFuncInfo.evalInfo = new ScalarFuncInfo(quotedSchemaStrings, 
functionName, paramNum);
+          } else if (functionName.equals("merge")) {
+            aggFuncInfo.mergeInfo = new ScalarFuncInfo(quotedSchemaStrings, 
functionName, paramNum);
+          } else if (functionName.equals("get_partial_result")) {
+            aggFuncInfo.getPartialResultInfo = new 
ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum);
+          } else if (functionName.equals("get_final_result")) {
+            aggFuncInfo.getFinalResultInfo = new 
ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum);
+          }
+        } else {
+          aggFuncInfo = null;
+          functions.add(new ScalarFuncInfo(quotedSchemaStrings, functionName, 
paramNum));
+        }
+
+        quotedSchemaStrings = null;
+      } else if (pClass.matcher(line).matches()) {
+        // UDAF
+        if (aggFuncInfo != null) {
+          functions.add(aggFuncInfo);
+        }
+        aggFuncInfo = new AggFuncInfo();
+        int classNameStart = line.indexOf("class ") + "class ".length();
+        int classNameEnd = line.indexOf("(");
+        if (classNameEnd < 0) {
+          classNameEnd = line.indexOf(":");
+        }
+        aggFuncInfo.className = line.substring(classNameStart, 
classNameEnd).trim();
+        aggFuncInfo.funcName = aggFuncInfo.className.toLowerCase();
       }
       line = br.readLine();
     }
+    if (aggFuncInfo != null) {
+      functions.add(aggFuncInfo);
+    }
     br.close();
     in.close();
     return functions;
@@ -162,12 +246,13 @@ public class PythonScriptEngine extends TajoScriptEngine {
   private static final int PATH_TO_CONTROLLER_FILE = 1;
   private static final int UDF_FILE_NAME = 2; // Name of file where UDF 
function is defined
   private static final int UDF_FILE_PATH = 3; // Path to directory containing 
file where UDF function is defined
-  private static final int UDF_NAME = 4; // Name of UDF function being called.
-  private static final int PATH_TO_FILE_CACHE = 5; // Directory where required 
files (like tajo_util) are cached on cluster nodes.
-  private static final int STD_OUT_OUTPUT_PATH = 6; // File for output from 
when user writes to standard output.
-  private static final int STD_ERR_OUTPUT_PATH = 7; // File for output from 
when user writes to standard error.
-  private static final int CONTROLLER_LOG_FILE_PATH = 8; // Controller log 
file logs progress through the controller script not user code.
-  private static final int OUT_SCHEMA = 9; // the schema of the output column
+  private static final int PATH_TO_FILE_CACHE = 4; // Directory where required 
files (like tajo_util) are cached on cluster nodes.
+  private static final int STD_OUT_OUTPUT_PATH = 5; // File for output from 
when user writes to standard output.
+  private static final int STD_ERR_OUTPUT_PATH = 6; // File for output from 
when user writes to standard error.
+  private static final int CONTROLLER_LOG_FILE_PATH = 7; // Controller log 
file logs progress through the controller script not user code.
+  private static final int OUT_SCHEMA = 8; // the schema of the output column
+  private static final int FUNCTION_OR_CLASS_NAME = 9; // if FUNCTION_TYPE is 
UDF, function name; if FUNCTION_TYPE is UDAF, class name.
+  private static final int FUNCTION_TYPE = 10; // UDF or UDAF
 
   private Configuration systemConf;
 
@@ -182,36 +267,42 @@ public class PythonScriptEngine extends TajoScriptEngine {
 
   private final FunctionSignature functionSignature;
   private final PythonInvocationDesc invocationDesc;
-  private final Schema inSchema;
-  private final Schema outSchema;
-  private final int [] projectionCols = new int[]{0};
+  private Schema inSchema;
+  private Schema outSchema;
+  private int[] projectionCols;
 
   private final CSVLineSerDe lineSerDe = new CSVLineSerDe();
   private final TableMeta pipeMeta = 
CatalogUtil.newTableMeta(CatalogProtos.StoreType.TEXTFILE);
 
-  private static final Tuple EMPTY_INPUT = new VTuple(0);
+  private final Tuple EMPTY_INPUT = new VTuple(0);
+  private final Schema EMPTY_SCHEMA = new Schema();
 
   public PythonScriptEngine(FunctionDesc functionDesc) {
-    if (!functionDesc.getInvocation().hasPython()) {
+    if (!functionDesc.getInvocation().hasPython() && 
!functionDesc.getInvocation().hasPythonAggregation()) {
       throw new IllegalStateException("Function type must be 'python'");
     }
     functionSignature = functionDesc.getSignature();
     invocationDesc = functionDesc.getInvocation().getPython();
+    setSchema();
+  }
 
-    TajoDataTypes.DataType[] paramTypes = functionSignature.getParamTypes();
-    inSchema = new Schema();
-    for (int i = 0; i < paramTypes.length; i++) {
-      inSchema.addColumn(new Column("in_" + i, paramTypes[i]));
+  public PythonScriptEngine(FunctionDesc functionDesc, boolean 
intermediatePhase, boolean finalPhase) {
+    if (!functionDesc.getInvocation().hasPython() && 
!functionDesc.getInvocation().hasPythonAggregation()) {
+      throw new IllegalStateException("Function type must be 'python'");
     }
-    outSchema = new Schema(new Column[]{new Column("out", 
functionSignature.getReturnType())});
+    functionSignature = functionDesc.getSignature();
+    invocationDesc = functionDesc.getInvocation().getPython();
+    this.intermediatePhase = intermediatePhase;
+    this.finalPhase = finalPhase;
+    setSchema();
   }
 
   @Override
   public void start(Configuration systemConf) throws IOException {
     this.systemConf = systemConf;
     startUdfController();
-    createInputHandlers();
     setStreams();
+    createInputHandlers();
     if (LOG.isDebugEnabled()) {
       LOG.debug("PythonScriptExecutor starts up");
     }
@@ -220,11 +311,7 @@ public class PythonScriptEngine extends TajoScriptEngine {
   @Override
   public void shutdown() {
     process.destroy();
-    FileUtil.cleanup(LOG, stdin);
-    FileUtil.cleanup(LOG, stdout);
-    FileUtil.cleanup(LOG, stderr);
-    FileUtil.cleanup(LOG, inputHandler);
-    FileUtil.cleanup(LOG, outputHandler);
+    FileUtil.cleanup(LOG, stdin, stdout, stderr, inputHandler, outputHandler);
     stdin = null;
     stdout = stderr = null;
     inputHandler = null;
@@ -245,7 +332,7 @@ public class PythonScriptEngine extends TajoScriptEngine {
    * @throws IOException
    */
   private String[] buildCommand() throws IOException {
-    String[] command = new String[10];
+    String[] command = new String[11];
 
     // TODO: support controller logging
     String standardOutputRootWriteLocation = 
systemConf.get(TajoConf.ConfVars.PYTHON_CONTROLLER_LOG_DIR.keyname(),
@@ -269,7 +356,6 @@ public class PythonScriptEngine extends TajoScriptEngine {
     fileName = fileName.endsWith(FILE_EXTENSION) ? fileName.substring(0, 
fileName.length()-3) : fileName;
     command[UDF_FILE_NAME] = fileName;
     command[UDF_FILE_PATH] = lastSeparator <= 0 ? "." : filePath.substring(0, 
lastSeparator - 1);
-    command[UDF_NAME] = funcName;
     String fileCachePath = 
systemConf.get(TajoConf.ConfVars.PYTHON_CODE_DIR.keyname());
     if (fileCachePath == null) {
       throw new IOException(TajoConf.ConfVars.PYTHON_CODE_DIR.keyname() + " 
must be set.");
@@ -279,17 +365,53 @@ public class PythonScriptEngine extends TajoScriptEngine {
     command[STD_ERR_OUTPUT_PATH] = errOutFileName;
     command[CONTROLLER_LOG_FILE_PATH] = controllerLogFileName;
     command[OUT_SCHEMA] = 
outSchema.getColumn(0).getDataType().getType().name().toLowerCase();
+    command[FUNCTION_OR_CLASS_NAME] = funcName;
+    command[FUNCTION_TYPE] = invocationDesc.isScalarFunction() ? "UDF" : 
"UDAF";
 
     return command;
   }
 
-  private void createInputHandlers() {
-    TextLineSerializer serializer = lineSerDe.createSerializer(inSchema, 
pipeMeta);
+  private void setSchema() {
+    if (invocationDesc.isScalarFunction()) {
+      TajoDataTypes.DataType[] paramTypes = functionSignature.getParamTypes();
+      inSchema = new Schema();
+      for (int i = 0; i < paramTypes.length; i++) {
+        inSchema.addColumn(new Column("in_" + i, paramTypes[i]));
+      }
+      outSchema = new Schema(new Column[]{new Column("out", 
functionSignature.getReturnType())});
+    } else {
+      // UDAF
+      if (!intermediatePhase && !finalPhase) {
+        // first phase
+        TajoDataTypes.DataType[] paramTypes = 
functionSignature.getParamTypes();
+        inSchema = new Schema();
+        for (int i = 0; i < paramTypes.length; i++) {
+          inSchema.addColumn(new Column("in_" + i, paramTypes[i]));
+        }
+        outSchema = new Schema(new Column[]{new Column("json", 
TajoDataTypes.Type.TEXT)});
+      } else if (intermediatePhase) {
+        inSchema = outSchema = new Schema(new Column[]{new Column("json", 
TajoDataTypes.Type.TEXT)});
+      } else if (finalPhase) {
+        inSchema = new Schema(new Column[]{new Column("json", 
TajoDataTypes.Type.TEXT)});
+        outSchema = new Schema(new Column[]{new Column("out", 
functionSignature.getReturnType())});
+      }
+    }
+    projectionCols = new int[outSchema.size()];
+    for (int i = 0; i < outSchema.size(); i++) {
+      projectionCols[i] = i;
+    }
+  }
+
+  private void createInputHandlers() throws IOException {
+    setSchema();
+    TextLineSerializer serializer = lineSerDe.createSerializer(pipeMeta);
     serializer.init();
     this.inputHandler = new InputHandler(serializer);
+    inputHandler.bindTo(stdin);
     TextLineDeserializer deserializer = 
lineSerDe.createDeserializer(outSchema, pipeMeta, projectionCols);
     deserializer.init();
     this.outputHandler = new OutputHandler(deserializer);
+    outputHandler.bindTo(stdout);
   }
 
   /**
@@ -297,13 +419,9 @@ public class PythonScriptEngine extends TajoScriptEngine {
    *
    * @throws IOException
    */
-  private void setStreams() throws IOException {
+  private void setStreams() {
     stdout = new DataInputStream(new 
BufferedInputStream(process.getInputStream()));
-    outputHandler.bindTo(stdout);
-
     stdin = new DataOutputStream(new 
BufferedOutputStream(process.getOutputStream()));
-    inputHandler.bindTo(stdin);
-
     stderr = new DataInputStream(new 
BufferedInputStream(process.getErrorStream()));
   }
 
@@ -342,16 +460,133 @@ public class PythonScriptEngine extends TajoScriptEngine 
{
     return controllerPath;
   }
 
-  public Datum eval(Tuple input) {
+  /**
+   * Call Python scalar functions.
+   *
+   * @param input input tuple
+   * @return evaluated result datum
+   */
+  @Override
+  public Datum callScalarFunc(Tuple input) {
     try {
-      if (input == null) {
-        // When nothing is passed into the UDF the tuple
-        // being sent is the full tuple for the relation.
-        // We want it to be nothing (since that's what the user wrote).
-        input = EMPTY_INPUT;
-      }
+      inputHandler.putNext(input, inSchema);
+      stdin.flush();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed adding input to inputQueue", e);
+    }
+    Datum result;
+    try {
+      result = outputHandler.getNext().get(0);
+    } catch (Exception e) {
+      throw new RuntimeException("Problem getting output", e);
+    }
+
+    return result;
+  }
+
+  /**
+   * Call Python aggregation functions.
+   *
+   * @param functionContext python function context
+   * @param input input tuple
+   */
+  @Override
+  public void callAggFunc(FunctionContext functionContext, Tuple input) {
+
+    String methodName;
+    if (!intermediatePhase && !finalPhase) {
+      // eval
+      methodName = "eval";
+    } else {
+      // merge
+      methodName = "merge";
+    }
+
+    try {
+      inputHandler.putNext(methodName, input, inSchema);
+      stdin.flush();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed adding input to inputQueue while 
executing " + methodName + " with " + input, e);
+    }
+
+    try {
+      outputHandler.getNext();
+    } catch (Exception e) {
+      throw new RuntimeException("Problem getting output", e);
+    }
+  }
+
+  /**
+   * Restore the intermediate result in Python UDAF with the snapshot stored 
in the function context.
+   *
+   * @param functionContext
+   */
+  public void updatePythonSideContext(PythonAggFunctionContext 
functionContext) throws IOException {
+
+    try {
+      inputHandler.putNext("update_context", functionContext);
+      stdin.flush();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed adding input to inputQueue", e);
+    }
+    try {
+      outputHandler.getNext();
+    } catch (Exception e) {
+      throw new RuntimeException("Problem getting output", e);
+    }
+  }
+
+  /**
+   * Get the snapshot of the intermediate result in the Python UDAF.
+   *
+   * @param functionContext
+   */
+  public void updateJavaSideContext(PythonAggFunctionContext functionContext) 
throws IOException {
+
+    try {
+      inputHandler.putNext("get_context", EMPTY_INPUT, EMPTY_SCHEMA);
+      stdin.flush();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed adding input to inputQueue", e);
+    }
+    try {
+      outputHandler.getNext(functionContext);
+    } catch (Exception e) {
+      throw new RuntimeException("Problem getting output", e);
+    }
+  }
+
+  /**
+   * Get intermediate result after the first stage.
+   *
+   * @param functionContext
+   * @return
+   */
+  @Override
+  public String getPartialResult(FunctionContext functionContext) {
+    try {
+      inputHandler.putNext("get_partial_result", EMPTY_INPUT, EMPTY_SCHEMA);
+      stdin.flush();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed adding input to inputQueue", e);
+    }
+    try {
+      return outputHandler.getPartialResultString();
+    } catch (Exception e) {
+      throw new RuntimeException("Problem getting output", e);
+    }
+  }
 
-      inputHandler.putNext(input);
+  /**
+   * Get final result after the last stage.
+   *
+   * @param functionContext
+   * @return
+   */
+  @Override
+  public Datum getFinalResult(FunctionContext functionContext) {
+    try {
+      inputHandler.putNext("get_final_result", EMPTY_INPUT, EMPTY_SCHEMA);
       stdin.flush();
     } catch (Exception e) {
       throw new RuntimeException("Failed adding input to inputQueue", e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java
index 726ec2f..c233fb8 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java
@@ -19,7 +19,9 @@
 package org.apache.tajo.plan.function.python;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.plan.function.FunctionContext;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.*;
@@ -30,6 +32,9 @@ import java.net.URI;
  */
 public abstract class TajoScriptEngine {
 
+  protected boolean intermediatePhase = false;
+  protected boolean finalPhase = false;
+
   /**
    * Open a stream load a script locally or in the classpath
    * @param scriptPath the path of the script
@@ -79,5 +84,19 @@ public abstract class TajoScriptEngine {
    * @param input
    * @return
    */
-  public abstract Datum eval(Tuple input);
+  public abstract Datum callScalarFunc(Tuple input);
+
+  public abstract void callAggFunc(FunctionContext functionContext, Tuple 
input);
+
+  public abstract String getPartialResult(FunctionContext functionContext);
+
+  public abstract Datum getFinalResult(FunctionContext functionContext);
+
+  public void setIntermediatePhase(boolean flag) {
+    this.intermediatePhase = flag;
+  }
+
+  public void setFinalPhase(boolean flag) {
+    this.finalPhase = flag;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java
index 0b3c625..9b65e4b 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java
@@ -19,24 +19,75 @@
 package org.apache.tajo.plan.function.stream;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
 import io.netty.util.internal.PlatformDependent;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.CommonTestingUtil;
+
+import java.lang.reflect.Field;
 
 /* this class is PooledBuffer holder */
 public class BufferPool {
 
-  private static final PooledByteBufAllocator allocator;
+  public static final String ALLOW_CACHE = 
"tajo.storage.buffer.thread-local.cache";
+  private static final ByteBufAllocator ALLOCATOR;
 
   private BufferPool() {
   }
 
   static {
-    //TODO we need determine the default params
-    allocator = new 
PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
+    /* TODO Enable thread cache
+    *  Create a pooled ByteBuf allocator but disables the thread-local cache.
+    *  Because the TaskRunner thread is newly created
+    * */
+
+    if (System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, 
"FALSE").equalsIgnoreCase("TRUE")) {
+      /* Disable pooling buffers for memory usage  */
+      ALLOCATOR = UnpooledByteBufAllocator.DEFAULT;
+
+      /* if you are finding memory leak, please enable this line */
+      ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
+    } else {
+      TajoConf tajoConf = new TajoConf();
+      ALLOCATOR = createPooledByteBufAllocator(true, 
tajoConf.getBoolean(ALLOW_CACHE, false), 0);
+    }
+  }
+
+  /**
+   * borrowed from Spark
+   */
+  public static PooledByteBufAllocator createPooledByteBufAllocator(
+      boolean allowDirectBufs,
+      boolean allowCache,
+      int numCores) {
+    if (numCores == 0) {
+      numCores = Runtime.getRuntime().availableProcessors();
+    }
+    return new PooledByteBufAllocator(
+        allowDirectBufs && PlatformDependent.directBufferPreferred(),
+        Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
+        Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), 
allowDirectBufs ? numCores : 0),
+        getPrivateStaticField("DEFAULT_PAGE_SIZE"),
+        getPrivateStaticField("DEFAULT_MAX_ORDER"),
+        allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
+        allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
+        allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
+    );
+  }
 
-    /* if you are finding memory leak, please enable this line */
-    //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
+  /** Used to get defaults from Netty's private static fields. */
+  private static int getPrivateStaticField(String name) {
+    try {
+      Field f = 
PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
+      f.setAccessible(true);
+      return f.getInt(null);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public static long maxDirectMemory() {
@@ -45,7 +96,7 @@ public class BufferPool {
 
 
   public static ByteBuf directBuffer(int size) {
-    return allocator.directBuffer(size);
+    return ALLOCATOR.directBuffer(size);
   }
 
   /**
@@ -55,7 +106,7 @@ public class BufferPool {
    * @return allocated ByteBuf from pool
    */
   public static ByteBuf directBuffer(int size, int max) {
-    return allocator.directBuffer(size, max);
+    return ALLOCATOR.directBuffer(size, max);
   }
 
   @InterfaceStability.Unstable

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java
index 1969d90..93c1364 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java
@@ -23,6 +23,8 @@ import io.netty.buffer.ByteBufProcessor;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.plan.function.FunctionContext;
+import 
org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;
@@ -51,6 +53,7 @@ public class CSVLineDeserializer extends TextLineDeserializer 
{
     fieldSerDer = new TextFieldSerializerDeserializer(meta);
   }
 
+  @Override
   public void deserialize(final ByteBuf lineBuf, Tuple output) throws 
IOException, TextLineParsingError {
     int[] projection = targetColumnIndexes;
     if (lineBuf == null || targetColumnIndexes == null || 
targetColumnIndexes.length == 0) {
@@ -75,7 +78,7 @@ public class CSVLineDeserializer extends TextLineDeserializer 
{
 
       if (projection.length > currentTarget && currentIndex == 
projection[currentTarget]) {
         lineBuf.setIndex(start, start + fieldLength);
-        Datum datum = fieldSerDer.deserialize(lineBuf, 
schema.getColumn(currentIndex), currentIndex, nullChars);
+        Datum datum = fieldSerDer.deserialize(lineBuf, 
schema.getColumn(currentIndex).getDataType(), nullChars);
         output.put(currentIndex, datum);
         currentTarget++;
       }
@@ -90,6 +93,18 @@ public class CSVLineDeserializer extends 
TextLineDeserializer {
   }
 
   @Override
+  public void deserialize(final ByteBuf lineBuf, FunctionContext context) 
throws IOException, TextLineParsingError {
+    PythonAggFunctionContext pythonContext = (PythonAggFunctionContext) 
context;
+    if (lineBuf == null) {
+      return;
+    }
+
+    byte[] bytes = new byte[lineBuf.readableBytes()];
+    lineBuf.readBytes(bytes);
+    pythonContext.setJsonData(new String(bytes));
+  }
+
+  @Override
   public void release() {
     if (nullChars != null) {
       nullChars.release();

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java
index 566e5c9..e9da196 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java
@@ -31,8 +31,8 @@ public class CSVLineSerDe extends TextLineSerDe {
   }
 
   @Override
-  public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
-    return new CSVLineSerializer(schema, meta);
+  public TextLineSerializer createSerializer(TableMeta meta) {
+    return new CSVLineSerializer(meta);
   }
 
   public static byte[] getFieldDelimiter(TableMeta meta) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java
index e1c7375..9db6632 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java
@@ -24,6 +24,8 @@ import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.AnyDatum;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.function.FunctionContext;
+import 
org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;
@@ -34,41 +36,29 @@ public class CSVLineSerializer extends TextLineSerializer {
 
   private byte[] nullChars;
   private byte[] delimiter;
-  private int columnNum;
 
-  private final static String PARAM_DELIM = "|\t_";
+  public final static String PARAM_DELIM = "|\t_";
 
-  public CSVLineSerializer(Schema schema, TableMeta meta) {
-    super(schema, meta);
+  public CSVLineSerializer(TableMeta meta) {
+    super(meta);
   }
 
   @Override
   public void init() {
     nullChars = TextLineSerDe.getNullCharsAsBytes(meta);
     delimiter = "|,_".getBytes();
-    columnNum = schema.size();
 
     serde = new TextFieldSerializerDeserializer(meta);
   }
 
   @Override
-  public int serialize(OutputStream out, Tuple input) throws IOException {
+  public int serialize(OutputStream out, Tuple input, Schema schema) throws 
IOException {
     int writtenBytes = 0;
 
-    for (int i = 0; i < columnNum; i++) {
-      Datum datum = input.get(i);
-      String typeStr;
-      if (datum.type() == TajoDataTypes.Type.ANY) {
-        typeStr = getTypeString(((AnyDatum)datum).getActual());
-      } else {
-        typeStr = getTypeString(datum);
-      }
-      out.write(typeStr.getBytes());
-      out.write(PARAM_DELIM.getBytes());
-
-      writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, 
nullChars);
+    for (int i = 0; i < input.size(); i++) {
+      writtenBytes += serializeDatum(out, input.get(i), 
schema.getColumn(i).getDataType());
 
-      if (columnNum - 1 > i) {
+      if (input.size() - 1 > i) {
         out.write(delimiter);
         writtenBytes += delimiter.length;
       }
@@ -77,12 +67,43 @@ public class CSVLineSerializer extends TextLineSerializer {
     return writtenBytes;
   }
 
+  private int serializeDatum(OutputStream out, Datum datum, 
TajoDataTypes.DataType dataType) throws IOException {
+    String typeStr;
+    if (datum.type() == TajoDataTypes.Type.ANY) {
+      typeStr = getTypeString(((AnyDatum)datum).getActual());
+    } else {
+      typeStr = getTypeString(datum);
+    }
+    out.write(typeStr.getBytes());
+    out.write(PARAM_DELIM.getBytes());
+
+    return serde.serialize(out, datum, dataType, nullChars);
+  }
+
+  @Override
+  public int serializeContext(OutputStream out, FunctionContext context) 
throws IOException {
+    int writtenBytes = 0;
+    PythonAggFunctionContext pythonContext = (PythonAggFunctionContext) 
context;
+
+    if (pythonContext.getJsonData() == null) {
+      byte[] bytes = "-".getBytes();
+      out.write(bytes);
+      writtenBytes += bytes.length;
+    } else {
+      byte[] bytes = pythonContext.getJsonData().getBytes();
+      out.write(bytes);
+      writtenBytes += bytes.length;
+    }
+
+    return writtenBytes;
+  }
+
   @Override
   public void release() {
 
   }
 
-  private static String getTypeString(Datum val) {
+  public static String getTypeString(Datum val) {
     switch (val.type()) {
       case NULL_TYPE:
         return "-";

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java
index 29a5aa2..340772e 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.tajo.plan.function.stream;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.Datum;
 
 import java.io.IOException;
@@ -28,9 +29,10 @@ import java.io.OutputStream;
 
 public interface FieldSerializerDeserializer {
 
-  int serialize(OutputStream out, Datum datum, Column col, int columnIndex, 
byte[] nullChars) throws IOException;
+  int serialize(OutputStream out, Datum datum, TajoDataTypes.DataType 
dataType, byte[] nullChars)
+      throws IOException;
 
-  Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf 
nullChars)
+  Datum deserialize(ByteBuf buf, TajoDataTypes.DataType dataType, ByteBuf 
nullChars)
       throws IOException, TextLineParsingError;
 
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java
index dcc53c1..fd2c3ee 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.plan.function.stream;
 
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.plan.function.PythonAggFunctionInvoke;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.Closeable;
@@ -50,8 +52,21 @@ public class InputHandler implements Closeable {
    * @param t input <code>Tuple</code>
    * @throws IOException
    */
-  public void putNext(Tuple t) throws IOException {
-    serializer.serialize(out, t);
+  public void putNext(Tuple t, Schema schema) throws IOException {
+    serializer.serialize(out, t, schema);
+    out.write(END_OF_RECORD_DELIM);
+  }
+
+  public void putNext(String methodName, Tuple t, Schema schema) throws 
IOException {
+    String wrappedMethod = "|" + methodName + "_" + 
CSVLineSerializer.PARAM_DELIM;
+    out.write(wrappedMethod.getBytes());
+    putNext(t, schema);
+  }
+
+  public void putNext(String methodName, 
PythonAggFunctionInvoke.PythonAggFunctionContext context) throws IOException {
+    String wrappedMethod = "|" + methodName + "_" + 
CSVLineSerializer.PARAM_DELIM;
+    out.write(wrappedMethod.getBytes());
+    serializer.serializeContext(out, context);
     out.write(END_OF_RECORD_DELIM);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java
index c5eb419..1eeb508 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java
@@ -20,8 +20,12 @@ package org.apache.tajo.plan.function.stream;
 
 import com.google.common.base.Charsets;
 import io.netty.buffer.ByteBuf;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.plan.function.FunctionContext;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.FileUtil;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -32,6 +36,8 @@ import java.io.InputStream;
  * Tajo-Streaming external command.
  */
 public class OutputHandler implements Closeable {
+
+  private static final Log LOG = LogFactory.getLog(OutputHandler.class);
   private static int DEFAULT_BUFFER = 64 * 1024;
   private final static byte[] END_OF_RECORD_DELIM = "|_\n".getBytes();
 
@@ -43,18 +49,21 @@ public class OutputHandler implements Closeable {
 
   private InputStream istream;
 
-  private final ByteBuf buf = BufferPool.directBuffer(DEFAULT_BUFFER);
+  private ByteBuf buf;
 
   // Both of these ignore the trailing "\n".  So if the default delimiter is 
"\n", recordDelimStr is "".
   private String recordDelimStr = null;
   private int recordDelimLength = 0;
-  private final Tuple tuple = new VTuple(1);
+  private final Tuple tuple;
 
   // flag to mark if close() has already been called
   private boolean alreadyClosed = false;
+  private final String FIELD_DELIM;
 
   public OutputHandler(TextLineDeserializer deserializer) {
     this.deserializer = deserializer;
+    FIELD_DELIM = new 
String(CSVLineSerDe.getFieldDelimiter(deserializer.meta));
+    tuple = new VTuple(deserializer.schema.size());
   }
 
   /**
@@ -68,6 +77,7 @@ public class OutputHandler implements Closeable {
   public void bindTo(InputStream is) throws IOException {
     this.istream  = is;
     this.in = new ByteBufLineReader(new ByteBufInputChannel(istream));
+    this.buf = BufferPool.directBuffer(DEFAULT_BUFFER);
   }
 
   /**
@@ -95,6 +105,38 @@ public class OutputHandler implements Closeable {
     return tuple;
   }
 
+  public String getPartialResultString() throws IOException {
+    if (in == null) {
+      return null;
+    }
+
+    currValue = null;
+    if (!readValue()) {
+      return null;
+    }
+    return currValue;
+  }
+
+  public FunctionContext getNext(FunctionContext context) throws IOException {
+    if (in == null) {
+      return null;
+    }
+
+    currValue = null;
+    if (!readValue()) {
+      return null;
+    }
+    buf.clear();
+    buf.writeBytes(currValue.getBytes());
+    try {
+      deserializer.deserialize(buf, context);
+    } catch (TextLineParsingError textLineParsingError) {
+      throw new IOException(textLineParsingError);
+    }
+
+    return context;
+  }
+
   private boolean readValue() throws IOException {
     currValue = in.readLine();
     if (currValue == null) {
@@ -113,10 +155,25 @@ public class OutputHandler implements Closeable {
       currValue += new String(lineBytes);
     }
 
-    if (currValue.contains("|_")) {
-      int pos = currValue.lastIndexOf("|_");
-      currValue = currValue.substring(0, pos);
+    String line = currValue;
+
+    int candidate = -1;
+    StringBuffer sb = new StringBuffer();
+    while ((candidate=line.indexOf("|")) >= 0) {
+      if (line.substring(candidate, candidate+2).equals("|_")) {
+        // record end
+        sb.append(line.substring(0, candidate));
+        break;
+      } else if (line.substring(candidate, candidate+3).equals("|,_")) {
+        sb.append(line.substring(0, candidate)).append(FIELD_DELIM);
+        line = line.substring(candidate+3, line.length());
+      } else if (line.substring(candidate, candidate+3).equals("|-_")) {
+        // null value
+        sb.append(FIELD_DELIM);
+        line = line.substring(candidate+3, line.length());
+      }
     }
+    currValue = sb.toString();
 
     return true;
   }
@@ -148,9 +205,13 @@ public class OutputHandler implements Closeable {
    */
   public void close() throws IOException {
     if(!alreadyClosed) {
-      istream.close();
+      FileUtil.cleanup(LOG, istream, in);
+      in = null;
       istream = null;
       alreadyClosed = true;
+      if (this.buf.refCnt() > 0) {
+        this.buf.release();
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java
index 9bb60dd..46558c1 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.plan.function.stream;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.util.TUtil;
 
 import java.io.File;
@@ -26,6 +28,8 @@ import java.util.Map;
 
 public class StreamingUtil {
 
+  private static final Log LOG = LogFactory.getLog(StreamingUtil.class);
+
   private static final String BASH = "bash";
   private static final String PATH = "PATH";
 
@@ -59,6 +63,14 @@ public class StreamingUtil {
       cmdArgs.add(sb.toString());
     }
 
+    if (LOG.isDebugEnabled()) {
+      StringBuffer sb = new StringBuffer("command: ");
+      for (String cmd : cmdArgs) {
+        sb.append(cmd).append(" ");
+      }
+      LOG.debug(sb.toString());
+    }
+
     // Start the external process
     ProcessBuilder processBuilder = new ProcessBuilder(cmdArgs
         .toArray(new String[cmdArgs.size()]));

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java
index cd9518b..b6d5020 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java
@@ -23,7 +23,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.CharsetUtil;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.tajo.TajoConstants;
-import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.*;
@@ -60,11 +60,10 @@ public class TextFieldSerializerDeserializer implements 
FieldSerializerDeseriali
   }
 
   @Override
-  public int serialize(OutputStream out, Datum datum, Column col, int 
columnIndex, byte[] nullChars)
+  public int serialize(OutputStream out, Datum datum, TajoDataTypes.DataType 
dataType, byte[] nullChars)
       throws IOException {
     byte[] bytes;
     int length = 0;
-    TajoDataTypes.DataType dataType = col.getDataType();
 
     if (datum == null || datum instanceof NullDatum) {
       switch (dataType.getType()) {
@@ -139,7 +138,8 @@ public class TextFieldSerializerDeserializer implements 
FieldSerializerDeseriali
         break;
       case ANY:
         AnyDatum anyDatum = (AnyDatum) datum;
-        length = serialize(out, anyDatum.getActual(), new Column("any", 
anyDatum.getActual().type()), 0, nullChars);
+        length = serialize(out, anyDatum.getActual(), 
CatalogUtil.newSimpleDataType(anyDatum.getActual().type()),
+            nullChars);
         break;
       default:
         throw new UnsupportedException(dataType.getType().name());
@@ -148,9 +148,9 @@ public class TextFieldSerializerDeserializer implements 
FieldSerializerDeseriali
   }
 
   @Override
-  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf 
nullChars) throws IOException {
+  public Datum deserialize(ByteBuf buf, TajoDataTypes.DataType dataType, 
ByteBuf nullChars) throws IOException {
     Datum datum;
-    TajoDataTypes.Type type = col.getDataType().getType();
+    TajoDataTypes.Type type = dataType.getType();
     boolean nullField;
     if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) {
       nullField = isNullText(buf, nullChars);
@@ -224,7 +224,7 @@ public class TextFieldSerializerDeserializer implements 
FieldSerializerDeseriali
               decoder.decode(buf.nioBuffer(buf.readerIndex(), 
buf.readableBytes())).toString());
           break;
         case PROTOBUF: {
-          ProtobufDatumFactory factory = 
ProtobufDatumFactory.get(col.getDataType());
+          ProtobufDatumFactory factory = ProtobufDatumFactory.get(dataType);
           Message.Builder builder = factory.newBuilder();
           try {
             byte[] bytes = new byte[buf.readableBytes()];

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java
index c2ea30c..19f414f 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java
@@ -21,6 +21,7 @@ package org.apache.tajo.plan.function.stream;
 import io.netty.buffer.ByteBuf;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.plan.function.FunctionContext;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;
@@ -53,6 +54,8 @@ public abstract class TextLineDeserializer {
    */
   public abstract void deserialize(final ByteBuf buf, Tuple output) throws 
IOException, TextLineParsingError;
 
+  public abstract void deserialize(final ByteBuf buf, FunctionContext context) 
throws IOException, TextLineParsingError;
+
   /**
    * Release external resources
    */

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java
index 89b169c..8cd3e56 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java
@@ -37,7 +37,7 @@ public abstract class TextLineSerDe {
 
   public abstract TextLineDeserializer createDeserializer(Schema schema, 
TableMeta meta, int [] targetColumnIndexes);
 
-  public abstract TextLineSerializer createSerializer(Schema schema, TableMeta 
meta);
+  public abstract TextLineSerializer createSerializer(TableMeta meta);
 
   public static ByteBuf getNullChars(TableMeta meta) {
     byte[] nullCharByteArray = getNullCharsAsBytes(meta);

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java
index ea8576c..71fdd23 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java
@@ -20,6 +20,7 @@ package org.apache.tajo.plan.function.stream;
 
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.plan.function.FunctionContext;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;
@@ -29,17 +30,17 @@ import java.io.OutputStream;
  * Write a Tuple into single text formatted line
  */
 public abstract class TextLineSerializer {
-  protected Schema schema;
   protected TableMeta meta;
 
-  public TextLineSerializer(Schema schema, TableMeta meta) {
-    this.schema = schema;
+  public TextLineSerializer(TableMeta meta) {
     this.meta = meta;
   }
 
   public abstract void init();
 
-  public abstract int serialize(OutputStream out, Tuple input) throws 
IOException;
+  public abstract int serialize(OutputStream out, Tuple input, Schema schema) 
throws IOException;
+
+  public abstract int serializeContext(OutputStream out, FunctionContext 
context) throws IOException;
 
   public abstract void release();
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java
index 3ca76ee..80c18cc 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java
@@ -186,10 +186,9 @@ public class EvalNodeDeserializer {
               evalContext.addScriptEngine(current, new 
PythonScriptEngine(funcDesc));
             }
           } else if (type == EvalType.AGG_FUNCTION || type == 
EvalType.WINDOW_FUNCTION) {
-            AggFunction instance = (AggFunction) funcDesc.newInstance();
             if (type == EvalType.AGG_FUNCTION) {
               AggregationFunctionCallEval aggFunc =
-                  new AggregationFunctionCallEval(new 
FunctionDesc(funcProto.getFuncion()), instance, params);
+                  new AggregationFunctionCallEval(new 
FunctionDesc(funcProto.getFuncion()), params);
 
               PlanProto.AggFunctionEvalSpec aggFunctionProto = 
protoNode.getAggFunction();
               
aggFunc.setIntermediatePhase(aggFunctionProto.getIntermediatePhase());
@@ -199,11 +198,16 @@ public class EvalNodeDeserializer {
               }
               current = aggFunc;
 
+              if (evalContext != null && 
funcDesc.getInvocation().hasPythonAggregation()) {
+                evalContext.addScriptEngine(current, new 
PythonScriptEngine(funcDesc,
+                    aggFunc.isIntermediatePhase(), aggFunc.isFinalPhase()));
+              }
+
             } else {
               WinFunctionEvalSpec windowFuncProto = protoNode.getWinFunction();
 
               WindowFunctionEval winFunc =
-                  new WindowFunctionEval(new 
FunctionDesc(funcProto.getFuncion()), instance, params,
+                  new WindowFunctionEval(new 
FunctionDesc(funcProto.getFuncion()), params,
                       convertWindowFrame(windowFuncProto.getWindowFrame()));
 
               if (windowFuncProto.getSortSpecCount() > 0) {

Reply via email to