http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvoke.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvoke.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvoke.java
new file mode 100644
index 0000000..728ae10
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvoke.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalContext;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * An abstract class for actual function invocation.
+ * The metadata for function invocation are stored in the {@link 
org.apache.tajo.function.FunctionInvocation} class.
+ */
+public abstract class FunctionInvoke implements Cloneable {
+  @Expose protected FunctionDesc functionDesc;
+
+  public FunctionInvoke() {
+
+  }
+
+  public FunctionInvoke(FunctionDesc functionDesc) {
+    this.functionDesc = functionDesc;
+  }
+
+  public static FunctionInvoke newInstance(FunctionDesc desc) throws 
InternalException {
+    if (desc.getInvocation().hasLegacy()) {
+      return new LegacyScalarFunctionInvoke(desc);
+    } else if (desc.getInvocation().hasPython()) {
+      return new PythonFunctionInvoke(desc);
+    } else {
+      throw new UnsupportedException(desc.getInvocation() + " is not 
supported");
+    }
+  }
+
+  public void setFunctionDesc(FunctionDesc functionDesc) throws 
InternalException {
+    this.functionDesc = functionDesc;
+  }
+
+  public abstract void init(FunctionInvokeContext context) throws IOException;
+
+  /**
+   * Evaluate the given tuple with a function
+   * @param tuple a tuple evaluated with parameters
+   * @return a result of a fuction execution
+   */
+  public abstract Datum eval(Tuple tuple);
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof FunctionInvoke) {
+      FunctionInvoke other = (FunctionInvoke) o;
+      return this.functionDesc.equals(other.functionDesc);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return functionDesc.hashCode();
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    FunctionInvoke clone = (FunctionInvoke) super.clone();
+    clone.functionDesc = (FunctionDesc) this.functionDesc.clone();
+    return clone;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvokeContext.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvokeContext.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvokeContext.java
new file mode 100644
index 0000000..b938072
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvokeContext.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function;
+
+import com.google.common.base.Objects;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.plan.expr.FunctionEval;
+import org.apache.tajo.plan.function.python.TajoScriptEngine;
+
+import java.util.Arrays;
+
+/**
+ * This class contains some metadata need to execute functions.
+ */
+public class FunctionInvokeContext {
+  private final OverridableConf queryContext;
+  private final FunctionEval.ParamType[] paramTypes;
+  private TajoScriptEngine scriptEngine;
+
+  public FunctionInvokeContext(OverridableConf queryContext, 
FunctionEval.ParamType[] paramTypes) {
+    this.queryContext = queryContext;
+    this.paramTypes = paramTypes;
+  }
+
+  public OverridableConf getQueryContext() {
+    return queryContext;
+  }
+
+  public FunctionEval.ParamType[] getParamTypes() {
+    return paramTypes;
+  }
+
+  public void setScriptEngine(TajoScriptEngine scriptEngine) {
+    this.scriptEngine = scriptEngine;
+  }
+
+  public boolean hasScriptExecutor() {
+    return scriptEngine != null;
+  }
+
+  public TajoScriptEngine getScriptEngine() {
+    return scriptEngine;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(queryContext, Arrays.hashCode(paramTypes));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof FunctionInvokeContext) {
+      FunctionInvokeContext other = (FunctionInvokeContext) o;
+      return queryContext.equals(other.queryContext) && 
Arrays.equals(paramTypes, other.paramTypes);
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java
new file mode 100644
index 0000000..6b2c116
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.plan.expr.EvalContext;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.TUtil;
+
+/**
+ * This class invokes the legacy scala functions.
+ */
+public class LegacyScalarFunctionInvoke extends FunctionInvoke implements 
Cloneable {
+  @Expose private GeneralFunction function;
+
+  public LegacyScalarFunctionInvoke() {
+
+  }
+
+  public LegacyScalarFunctionInvoke(FunctionDesc funcDesc) throws 
InternalException {
+    super(funcDesc);
+    function = (GeneralFunction) funcDesc.newInstance();
+  }
+
+  @Override
+  public void setFunctionDesc(FunctionDesc desc) throws InternalException {
+    super.setFunctionDesc(desc);
+    function = (GeneralFunction) functionDesc.newInstance();
+  }
+
+  @Override
+  public void init(FunctionInvokeContext context) {
+    function.init(context.getQueryContext(), context.getParamTypes());
+  }
+
+  @Override
+  public Datum eval(Tuple tuple) {
+    return function.eval(tuple);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof LegacyScalarFunctionInvoke) {
+      LegacyScalarFunctionInvoke other = (LegacyScalarFunctionInvoke) o;
+      return super.equals(other) &&
+          TUtil.checkEquals(function, other.function);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return function.hashCode();
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    LegacyScalarFunctionInvoke clone = (LegacyScalarFunctionInvoke) 
super.clone();
+    clone.function = (GeneralFunction) function.clone();
+    return clone;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java
new file mode 100644
index 0000000..1019c60
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java
@@ -0,0 +1,59 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function;
+
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.plan.function.python.PythonScriptEngine;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+/**
+ * This class invokes the python functions.
+ */
+public class PythonFunctionInvoke extends FunctionInvoke implements Cloneable {
+
+  private transient PythonScriptEngine scriptEngine;
+
+  public PythonFunctionInvoke() {
+
+  }
+
+  public PythonFunctionInvoke(FunctionDesc functionDesc) {
+    super(functionDesc);
+  }
+
+  @Override
+  public void init(FunctionInvokeContext context) throws IOException {
+    this.scriptEngine = (PythonScriptEngine) context.getScriptEngine();
+  }
+
+  @Override
+  public Datum eval(Tuple tuple) {
+    Datum res = scriptEngine.eval(tuple);
+    return res;
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    // nothing to do
+    return super.clone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
new file mode 100644
index 0000000..0da30f1
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.python;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.function.FunctionInvocation;
+import org.apache.tajo.function.FunctionSignature;
+import org.apache.tajo.function.FunctionSupplement;
+import org.apache.tajo.function.PythonInvocationDesc;
+import org.apache.tajo.plan.function.stream.*;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.TUtil;
+
+import java.io.*;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * {@link PythonScriptEngine} is responsible for registering python functions 
and maintaining the controller process.
+ * The controller is a python process that executes the python UDFs.
+ * (Please refer to 'tajo-core/src/main/resources/python/controller.py')
+ * Data are exchanged via standard I/O between PythonScriptEngine and the 
controller.
+ */
+public class PythonScriptEngine extends TajoScriptEngine {
+
+  private static final Log LOG = LogFactory.getLog(PythonScriptEngine.class);
+
+  public static final String FILE_EXTENSION = ".py";
+
+  /**
+   * Register functions defined in a python script
+   *
+   * @param path path to the python script file
+   * @param namespace namespace where the functions will be defined
+   * @return set of function descriptions
+   * @throws IOException
+   */
+  public static Set<FunctionDesc> registerFunctions(URI path, String 
namespace) throws IOException {
+    // TODO: we should support the namespace for python functions.
+
+    Set<FunctionDesc> functionDescs = TUtil.newHashSet();
+
+    InputStream in = getScriptAsStream(path);
+    List<FuncInfo> functions = null;
+    try {
+      functions = getFunctions(in);
+    } finally {
+      in.close();
+    }
+    for(FuncInfo funcInfo : functions) {
+      TajoDataTypes.DataType returnType = 
CatalogUtil.newSimpleDataType(TajoDataTypes.Type.valueOf(funcInfo.returnType));
+      FunctionSignature signature = new 
FunctionSignature(CatalogProtos.FunctionType.UDF, funcInfo.funcName,
+          returnType, createParamTypes(funcInfo.paramNum));
+      FunctionInvocation invocation = new FunctionInvocation();
+      PythonInvocationDesc invocationDesc = new 
PythonInvocationDesc(funcInfo.funcName, path.getPath());
+      invocation.setPython(invocationDesc);
+      FunctionSupplement supplement = new FunctionSupplement();
+      functionDescs.add(new FunctionDesc(signature, invocation, supplement));
+    }
+    return functionDescs;
+  }
+
+  private static TajoDataTypes.DataType[] createParamTypes(int paramNum) {
+    TajoDataTypes.DataType[] paramTypes = new TajoDataTypes.DataType[paramNum];
+    for (int i = 0; i < paramNum; i++) {
+      paramTypes[i] = 
TajoDataTypes.DataType.newBuilder().setType(TajoDataTypes.Type.ANY).build();
+    }
+    return paramTypes;
+  }
+
+  private static final Pattern pSchema = 
Pattern.compile("^\\s*\\W+outputType.*");
+  private static final Pattern pDef = 
Pattern.compile("^\\s*def\\s+(\\w+)\\s*.+");
+
+  private static class FuncInfo {
+    String returnType;
+    String funcName;
+    int paramNum;
+
+    public FuncInfo(String returnType, String funcName, int paramNum) {
+      this.returnType = returnType.toUpperCase();
+      this.funcName = funcName;
+      this.paramNum = paramNum;
+    }
+  }
+
+  // TODO: python parser must be improved.
+  private static List<FuncInfo> getFunctions(InputStream is) throws 
IOException {
+    List<FuncInfo> functions = TUtil.newList();
+    InputStreamReader in = new InputStreamReader(is, Charset.defaultCharset());
+    BufferedReader br = new BufferedReader(in);
+    String line = br.readLine();
+    String schemaString = null;
+    while (line != null) {
+      if (pSchema.matcher(line).matches()) {
+        int start = line.indexOf("(") + 2; //drop brackets/quotes
+        int end = line.lastIndexOf(")") - 1;
+        schemaString = line.substring(start,end).trim();
+      } else if (pDef.matcher(line).matches()) {
+        int nameStart = line.indexOf("def ") + "def ".length();
+        int nameEnd = line.indexOf('(');
+        int signatureEnd = line.indexOf(')');
+        String[] params = line.substring(nameEnd+1, signatureEnd).split(",");
+        int paramNum;
+        if (params.length == 1) {
+          paramNum = params[0].equals("") ? 0 : 1;
+        } else {
+          paramNum = params.length;
+        }
+
+        String functionName = line.substring(nameStart, nameEnd).trim();
+        schemaString = schemaString == null ? "blob" : schemaString;
+        functions.add(new FuncInfo(schemaString, functionName, paramNum));
+        schemaString = null;
+      }
+      line = br.readLine();
+    }
+    br.close();
+    in.close();
+    return functions;
+  }
+
+
+  private static final String PYTHON_LANGUAGE = "python";
+  private static final String PYTHON_ROOT_PATH = "/python";
+  private static final String TAJO_UTIL_NAME = "tajo_util.py";
+  private static final String CONTROLLER_NAME = "controller.py";
+  private static final String PYTHON_CONTROLLER_JAR_PATH = PYTHON_ROOT_PATH + 
File.separator + CONTROLLER_NAME; // Relative to root of tajo jar.
+  private static final String PYTHON_TAJO_UTIL_PATH = PYTHON_ROOT_PATH + 
File.separator + TAJO_UTIL_NAME; // Relative to root of tajo jar.
+  private static final String DEFAULT_LOG_DIR = "/tmp/tajo-" + 
System.getProperty("user.name") + "/python";
+
+  // Indexes for arguments being passed to external process
+  private static final int UDF_LANGUAGE = 0;
+  private static final int PATH_TO_CONTROLLER_FILE = 1;
+  private static final int UDF_FILE_NAME = 2; // Name of file where UDF 
function is defined
+  private static final int UDF_FILE_PATH = 3; // Path to directory containing 
file where UDF function is defined
+  private static final int UDF_NAME = 4; // Name of UDF function being called.
+  private static final int PATH_TO_FILE_CACHE = 5; // Directory where required 
files (like tajo_util) are cached on cluster nodes.
+  private static final int STD_OUT_OUTPUT_PATH = 6; // File for output from 
when user writes to standard output.
+  private static final int STD_ERR_OUTPUT_PATH = 7; // File for output from 
when user writes to standard error.
+  private static final int CONTROLLER_LOG_FILE_PATH = 8; // Controller log 
file logs progress through the controller script not user code.
+  private static final int OUT_SCHEMA = 9; // the schema of the output column
+
+  private Configuration systemConf;
+
+  private Process process; // Handle to the external execution of python 
functions
+
+  private InputHandler inputHandler;
+  private OutputHandler outputHandler;
+
+  private DataOutputStream stdin; // stdin of the process
+  private InputStream stdout; // stdout of the process
+  private InputStream stderr; // stderr of the process
+
+  private final FunctionSignature functionSignature;
+  private final PythonInvocationDesc invocationDesc;
+  private final Schema inSchema;
+  private final Schema outSchema;
+  private final int [] projectionCols = new int[]{0};
+
+  private final CSVLineSerDe lineSerDe = new CSVLineSerDe();
+  private final TableMeta pipeMeta = 
CatalogUtil.newTableMeta(CatalogProtos.StoreType.TEXTFILE);
+
+  private static final Tuple EMPTY_INPUT = new VTuple(0);
+
+  public PythonScriptEngine(FunctionDesc functionDesc) {
+    if (!functionDesc.getInvocation().hasPython()) {
+      throw new IllegalStateException("Function type must be 'python'");
+    }
+    functionSignature = functionDesc.getSignature();
+    invocationDesc = functionDesc.getInvocation().getPython();
+
+    TajoDataTypes.DataType[] paramTypes = functionSignature.getParamTypes();
+    inSchema = new Schema();
+    for (int i = 0; i < paramTypes.length; i++) {
+      inSchema.addColumn(new Column("in_" + i, paramTypes[i]));
+    }
+    outSchema = new Schema(new Column[]{new Column("out", 
functionSignature.getReturnType())});
+  }
+
+  @Override
+  public void start(Configuration systemConf) throws IOException {
+    this.systemConf = systemConf;
+    startUdfController();
+    createInputHandlers();
+    setStreams();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("PythonScriptExecutor starts up");
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    process.destroy();
+    FileUtil.cleanup(LOG, stdin);
+    FileUtil.cleanup(LOG, stdout);
+    FileUtil.cleanup(LOG, stderr);
+    FileUtil.cleanup(LOG, inputHandler);
+    FileUtil.cleanup(LOG, outputHandler);
+    stdin = null;
+    stdout = stderr = null;
+    inputHandler = null;
+    outputHandler = null;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("PythonScriptExecutor shuts down");
+    }
+  }
+
+  private void startUdfController() throws IOException {
+    ProcessBuilder processBuilder = 
StreamingUtil.createProcess(buildCommand());
+    process = processBuilder.start();
+  }
+
+  /**
+   * Build a command to execute an external process.
+   * @return
+   * @throws IOException
+   */
+  private String[] buildCommand() throws IOException {
+    String[] command = new String[10];
+
+    // TODO: support controller logging
+    String standardOutputRootWriteLocation = 
systemConf.get(TajoConf.ConfVars.PYTHON_CONTROLLER_LOG_DIR.keyname(),
+        DEFAULT_LOG_DIR);
+    if (!standardOutputRootWriteLocation.equals(DEFAULT_LOG_DIR)) {
+      LOG.warn("Currently, logging is not supported for the python 
controller.");
+    }
+    String controllerLogFileName, outFileName, errOutFileName;
+
+    String funcName = invocationDesc.getName();
+    String filePath = invocationDesc.getPath();
+
+    controllerLogFileName = standardOutputRootWriteLocation + funcName + 
"_controller.log";
+    outFileName = standardOutputRootWriteLocation + funcName + ".out";
+    errOutFileName = standardOutputRootWriteLocation + funcName + ".err";
+
+    command[UDF_LANGUAGE] = PYTHON_LANGUAGE;
+    command[PATH_TO_CONTROLLER_FILE] = getControllerPath();
+    int lastSeparator = filePath.lastIndexOf(File.separator) + 1;
+    String fileName = filePath.substring(lastSeparator);
+    fileName = fileName.endsWith(FILE_EXTENSION) ? fileName.substring(0, 
fileName.length()-3) : fileName;
+    command[UDF_FILE_NAME] = fileName;
+    command[UDF_FILE_PATH] = lastSeparator <= 0 ? "." : filePath.substring(0, 
lastSeparator - 1);
+    command[UDF_NAME] = funcName;
+    String fileCachePath = 
systemConf.get(TajoConf.ConfVars.PYTHON_CODE_DIR.keyname());
+    if (fileCachePath == null) {
+      throw new IOException(TajoConf.ConfVars.PYTHON_CODE_DIR.keyname() + " 
must be set.");
+    }
+    command[PATH_TO_FILE_CACHE] = "'" + fileCachePath + "'";
+    command[STD_OUT_OUTPUT_PATH] = outFileName;
+    command[STD_ERR_OUTPUT_PATH] = errOutFileName;
+    command[CONTROLLER_LOG_FILE_PATH] = controllerLogFileName;
+    command[OUT_SCHEMA] = 
outSchema.getColumn(0).getDataType().getType().name().toLowerCase();
+
+    return command;
+  }
+
+  private void createInputHandlers() {
+    TextLineSerializer serializer = lineSerDe.createSerializer(inSchema, 
pipeMeta);
+    serializer.init();
+    this.inputHandler = new InputHandler(serializer);
+    TextLineDeserializer deserializer = 
lineSerDe.createDeserializer(outSchema, pipeMeta, projectionCols);
+    deserializer.init();
+    this.outputHandler = new OutputHandler(deserializer);
+  }
+
+  /**
+   * Get the standard input, output, and error streams of the external process
+   *
+   * @throws IOException
+   */
+  private void setStreams() throws IOException {
+    stdout = new DataInputStream(new 
BufferedInputStream(process.getInputStream()));
+    outputHandler.bindTo(stdout);
+
+    stdin = new DataOutputStream(new 
BufferedOutputStream(process.getOutputStream()));
+    inputHandler.bindTo(stdin);
+
+    stderr = new DataInputStream(new 
BufferedInputStream(process.getErrorStream()));
+  }
+
+  /**
+   * Find the path to the controller file for the streaming language.
+   *
+   * First check path to job jar and if the file is not found (like in the
+   * case of running hadoop in standalone mode) write the necessary files
+   * to temporary files and return that path.
+   *
+   * @return
+   * @throws IOException
+   */
+  private String getControllerPath() throws IOException {
+    String controllerPath = PYTHON_CONTROLLER_JAR_PATH;
+    File controller = new File(PYTHON_CONTROLLER_JAR_PATH);
+    if (!controller.exists()) {
+      File controllerFile = File.createTempFile("controller", FILE_EXTENSION);
+      InputStream pythonControllerStream = 
this.getClass().getResourceAsStream(PYTHON_CONTROLLER_JAR_PATH);
+      try {
+        FileUtils.copyInputStreamToFile(pythonControllerStream, 
controllerFile);
+      } finally {
+        pythonControllerStream.close();
+      }
+      controllerFile.deleteOnExit();
+      File tajoUtilFile = new File(controllerFile.getParent() + File.separator 
+ TAJO_UTIL_NAME);
+      tajoUtilFile.deleteOnExit();
+      InputStream pythonUtilStream = 
this.getClass().getResourceAsStream(PYTHON_TAJO_UTIL_PATH);
+      try {
+        FileUtils.copyInputStreamToFile(pythonUtilStream, tajoUtilFile);
+      } finally {
+        pythonUtilStream.close();
+      }
+      controllerPath = controllerFile.getAbsolutePath();
+    }
+    return controllerPath;
+  }
+
+  public Datum eval(Tuple input) {
+    try {
+      if (input == null) {
+        // When nothing is passed into the UDF the tuple
+        // being sent is the full tuple for the relation.
+        // We want it to be nothing (since that's what the user wrote).
+        input = EMPTY_INPUT;
+      }
+
+      inputHandler.putNext(input);
+      stdin.flush();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed adding input to inputQueue", e);
+    }
+    Datum result;
+    try {
+      result = outputHandler.getNext().get(0);
+    } catch (Exception e) {
+      throw new RuntimeException("Problem getting output", e);
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java
new file mode 100644
index 0000000..726ec2f
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.python;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.*;
+import java.net.URI;
+
+/**
+ * Abstract class of script engine
+ */
+public abstract class TajoScriptEngine {
+
+  /**
+   * Open a stream load a script locally or in the classpath
+   * @param scriptPath the path of the script
+   * @return a stream (it is the responsibility of the caller to close it)
+   * @throws IllegalStateException if we could not open a stream
+   */
+  protected static InputStream getScriptAsStream(URI scriptPath) {
+    InputStream is = null;
+    File file;
+    try {
+      file = new File(scriptPath);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("path: " + scriptPath, e);
+    }
+    if (file.exists()) {
+      try {
+        is = new FileInputStream(file);
+      } catch (FileNotFoundException e) {
+        throw new IllegalStateException("could not find existing file 
"+scriptPath, e);
+      }
+    }
+
+    if (is == null) {
+      throw new IllegalStateException(
+          "Could not initialize interpreter (from file system or classpath) 
with " + scriptPath);
+    }
+    return is;
+  }
+
+  /**
+   * Start TajoScriptEngine.
+   *
+   * @param systemConf
+   * @throws IOException
+   */
+  public abstract void start(Configuration systemConf) throws IOException;
+
+  /**
+   * Shutdown TajoScriptEngine.
+   * @throws IOException
+   */
+  public abstract void shutdown();
+
+  /**
+   * Evaluate the input tuple.
+   *
+   * @param input
+   * @return
+   */
+  public abstract Datum eval(Tuple input);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java
new file mode 100644
index 0000000..0b3c625
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.internal.PlatformDependent;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/* this class is PooledBuffer holder */
+public class BufferPool {
+
+  private static final PooledByteBufAllocator allocator;
+
+  private BufferPool() {
+  }
+
+  static {
+    //TODO we need determine the default params
+    allocator = new 
PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
+
+    /* if you are finding memory leak, please enable this line */
+    //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
+  }
+
+  public static long maxDirectMemory() {
+    return PlatformDependent.maxDirectMemory();
+  }
+
+
+  public static ByteBuf directBuffer(int size) {
+    return allocator.directBuffer(size);
+  }
+
+  /**
+   *
+   * @param size the initial capacity
+   * @param max the max capacity
+   * @return allocated ByteBuf from pool
+   */
+  public static ByteBuf directBuffer(int size, int max) {
+    return allocator.directBuffer(size, max);
+  }
+
+  @InterfaceStability.Unstable
+  public static void forceRelease(ByteBuf buf) {
+    buf.release(buf.refCnt());
+  }
+
+  /**
+   * the ByteBuf will increase to writable size
+   * @param buf
+   * @param minWritableBytes required minimum writable size
+   */
+  public static void ensureWritable(ByteBuf buf, int minWritableBytes) {
+    buf.ensureWritable(minWritableBytes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufInputChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufInputChannel.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufInputChannel.java
new file mode 100644
index 0000000..daf2357
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufInputChannel.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.tajo.util.FileUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.channels.spi.AbstractInterruptibleChannel;
+
+public class ByteBufInputChannel extends AbstractInterruptibleChannel 
implements ScatteringByteChannel {
+
+  ByteBufferReadable byteBufferReadable;
+  ReadableByteChannel channel;
+  InputStream inputStream;
+
+  public ByteBufInputChannel(InputStream inputStream) {
+    if (inputStream instanceof ByteBufferReadable) {
+      this.byteBufferReadable = (ByteBufferReadable) inputStream;
+    } else {
+      this.channel = Channels.newChannel(inputStream);
+    }
+
+    this.inputStream = inputStream;
+  }
+
+  @Override
+  public long read(ByteBuffer[] dsts, int offset, int length) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long read(ByteBuffer[] dsts) {
+    return read(dsts, 0, dsts.length);
+  }
+
+  @Override
+  public int read(ByteBuffer dst) throws IOException {
+    if (byteBufferReadable != null) {
+      return byteBufferReadable.read(dst);
+    } else {
+      return channel.read(dst);
+    }
+  }
+
+  @Override
+  protected void implCloseChannel() throws IOException {
+    FileUtil.cleanup(null, channel, inputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufLineReader.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufLineReader.java
new file mode 100644
index 0000000..f100931
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufLineReader.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.CharsetUtil;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ByteBufLineReader implements Closeable {
+  private static int DEFAULT_BUFFER = 64 * 1024;
+
+  private int bufferSize;
+  private long readBytes;
+  private int startIndex;
+  private boolean eof = false;
+  private ByteBuf buffer;
+  private final ByteBufInputChannel channel;
+  private final AtomicInteger lineReadBytes = new AtomicInteger();
+  private final LineSplitProcessor processor = new LineSplitProcessor();
+
+  public ByteBufLineReader(ByteBufInputChannel channel) {
+    this(channel, BufferPool.directBuffer(DEFAULT_BUFFER));
+  }
+
+  public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) {
+    this.readBytes = 0;
+    this.channel = channel;
+    this.buffer = buf;
+    this.bufferSize = buf.capacity();
+  }
+
+  public long readBytes() {
+    return readBytes - buffer.readableBytes();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.buffer.refCnt() > 0) {
+      this.buffer.release();
+    }
+    this.channel.close();
+  }
+
+  public String readLine() throws IOException {
+    ByteBuf buf = readLineBuf(lineReadBytes);
+    if (buf != null) {
+      return buf.toString(CharsetUtil.UTF_8);
+    }
+    return null;
+  }
+
+  private void fillBuffer() throws IOException {
+
+    int tailBytes = 0;
+    if (this.readBytes > 0) {
+      //startIndex = 0, readIndex = tailBytes length, writable = (buffer 
capacity - tailBytes)
+      this.buffer.markReaderIndex();
+      this.buffer.discardReadBytes();  // compact the buffer
+      tailBytes = this.buffer.writerIndex();
+      if (!this.buffer.isWritable()) {
+        // a line bytes is large than the buffer
+        BufferPool.ensureWritable(buffer, bufferSize * 2);
+        this.bufferSize = buffer.capacity();
+      }
+      this.startIndex = 0;
+    }
+
+    boolean release = true;
+    try {
+      int readBytes = tailBytes;
+      // read only once
+      int localReadBytes = buffer.writeBytes(channel, this.bufferSize - 
readBytes);
+      if (localReadBytes < 0) {
+        if (buffer.isWritable()) {
+          //if read bytes is less than the buffer capacity,  there is no more 
bytes in the channel
+          eof = true;
+        }
+      }
+      readBytes += localReadBytes;
+
+      this.readBytes += (readBytes - tailBytes);
+      release = false;
+
+      this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip 
past buffer (tail)
+    } finally {
+      if (release) {
+        buffer.release();
+      }
+    }
+  }
+
+  /**
+   * Read a line terminated by one of CR, LF, or CRLF.
+   */
+  public ByteBuf readLineBuf(AtomicInteger reads) throws IOException {
+    int readBytes = 0; // newline + text line bytes
+    int newlineLength = 0; //length of terminating newline
+    int readable;
+
+    this.startIndex = buffer.readerIndex();
+
+    loop:
+    while (true) {
+      readable = buffer.readableBytes();
+      if (readable <= 0) {
+        buffer.readerIndex(this.startIndex);
+        fillBuffer(); //compact and fill buffer
+
+        //if buffer.writerIndex() is zero, there is no bytes in buffer
+        if (!buffer.isReadable() && buffer.writerIndex() == 0) {
+          reads.set(0);
+          return null;
+        } else {
+          //skip first newLine
+          if (processor.isPrevCharCR() && buffer.getByte(buffer.readerIndex()) 
== LineSplitProcessor.LF) {
+            buffer.skipBytes(1);
+            if(eof && !buffer.isReadable()) {
+              reads.set(1);
+              return null;
+            }
+
+            newlineLength++;
+            readBytes++;
+            startIndex = buffer.readerIndex();
+          }
+        }
+        readable = buffer.readableBytes();
+      }
+
+      int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, 
processor);
+      if (endIndex < 0) {
+        //does not appeared terminating newline
+        buffer.readerIndex(buffer.writerIndex()); // set to end buffer
+        if(eof){
+          readBytes += (buffer.readerIndex() - startIndex);
+          break loop;
+        }
+      } else {
+        buffer.readerIndex(endIndex + 1);
+        readBytes += (buffer.readerIndex() - startIndex); //past newline + 
text line
+
+        //appeared terminating CRLF
+        if (processor.isPrevCharCR() && buffer.isReadable()
+            && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) {
+          buffer.skipBytes(1);
+          readBytes++;
+          newlineLength += 2;
+        } else {
+          newlineLength += 1;
+        }
+        break loop;
+      }
+    }
+    reads.set(readBytes);
+    return buffer.slice(startIndex, readBytes - newlineLength);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java
new file mode 100644
index 0000000..1969d90
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufProcessor;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public class CSVLineDeserializer extends TextLineDeserializer {
+  private ByteBufProcessor processor;
+  private FieldSerializerDeserializer fieldSerDer;
+  private ByteBuf nullChars;
+  private int delimiterCompensation;
+
+  public CSVLineDeserializer(Schema schema, TableMeta meta, int[] 
targetColumnIndexes) {
+    super(schema, meta, targetColumnIndexes);
+  }
+
+  @Override
+  public void init() {
+    byte[] delimiter = CSVLineSerDe.getFieldDelimiter(meta);
+    this.processor = new FieldSplitProcessor(delimiter[0]);
+    this.delimiterCompensation = delimiter.length - 1;
+
+    if (nullChars != null) {
+      nullChars.release();
+    }
+    nullChars = TextLineSerDe.getNullChars(meta);
+
+    fieldSerDer = new TextFieldSerializerDeserializer(meta);
+  }
+
+  public void deserialize(final ByteBuf lineBuf, Tuple output) throws 
IOException, TextLineParsingError {
+    int[] projection = targetColumnIndexes;
+    if (lineBuf == null || targetColumnIndexes == null || 
targetColumnIndexes.length == 0) {
+      return;
+    }
+
+    final int rowLength = lineBuf.readableBytes();
+    int start = 0, fieldLength = 0, end = 0;
+
+    //Projection
+    int currentTarget = 0;
+    int currentIndex = 0;
+
+    while (end != -1) {
+      end = lineBuf.forEachByte(start, rowLength - start, processor);
+
+      if (end < 0) {
+        fieldLength = rowLength - start;
+      } else {
+        fieldLength = end - start - delimiterCompensation;
+      }
+
+      if (projection.length > currentTarget && currentIndex == 
projection[currentTarget]) {
+        lineBuf.setIndex(start, start + fieldLength);
+        Datum datum = fieldSerDer.deserialize(lineBuf, 
schema.getColumn(currentIndex), currentIndex, nullChars);
+        output.put(currentIndex, datum);
+        currentTarget++;
+      }
+
+      if (projection.length == currentTarget) {
+        break;
+      }
+
+      start = end + 1;
+      currentIndex++;
+    }
+  }
+
+  @Override
+  public void release() {
+    if (nullChars != null) {
+      nullChars.release();
+      nullChars = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java
new file mode 100644
index 0000000..566e5c9
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.Bytes;
+
+public class CSVLineSerDe extends TextLineSerDe {
+  @Override
+  public TextLineDeserializer createDeserializer(Schema schema, TableMeta 
meta, int[] targetColumnIndexes) {
+    return new CSVLineDeserializer(schema, meta, targetColumnIndexes);
+  }
+
+  @Override
+  public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
+    return new CSVLineSerializer(schema, meta);
+  }
+
+  public static byte[] getFieldDelimiter(TableMeta meta) {
+    return 
StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER,
+        
StorageConstants.DEFAULT_FIELD_DELIMITER)).getBytes(Bytes.UTF8_CHARSET);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java
new file mode 100644
index 0000000..e1c7375
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.AnyDatum;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class CSVLineSerializer extends TextLineSerializer {
+  private FieldSerializerDeserializer serde;
+
+  private byte[] nullChars;
+  private byte[] delimiter;
+  private int columnNum;
+
+  private final static String PARAM_DELIM = "|\t_";
+
+  public CSVLineSerializer(Schema schema, TableMeta meta) {
+    super(schema, meta);
+  }
+
+  @Override
+  public void init() {
+    nullChars = TextLineSerDe.getNullCharsAsBytes(meta);
+    delimiter = "|,_".getBytes();
+    columnNum = schema.size();
+
+    serde = new TextFieldSerializerDeserializer(meta);
+  }
+
+  @Override
+  public int serialize(OutputStream out, Tuple input) throws IOException {
+    int writtenBytes = 0;
+
+    for (int i = 0; i < columnNum; i++) {
+      Datum datum = input.get(i);
+      String typeStr;
+      if (datum.type() == TajoDataTypes.Type.ANY) {
+        typeStr = getTypeString(((AnyDatum)datum).getActual());
+      } else {
+        typeStr = getTypeString(datum);
+      }
+      out.write(typeStr.getBytes());
+      out.write(PARAM_DELIM.getBytes());
+
+      writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, 
nullChars);
+
+      if (columnNum - 1 > i) {
+        out.write(delimiter);
+        writtenBytes += delimiter.length;
+      }
+    }
+
+    return writtenBytes;
+  }
+
+  @Override
+  public void release() {
+
+  }
+
+  private static String getTypeString(Datum val) {
+    switch (val.type()) {
+      case NULL_TYPE:
+        return "-";
+      case BOOLEAN:
+        return "B";
+      case INT1:
+      case INT2:
+      case INT4:
+        return "I";
+      case INT8:
+        return "L";
+      case FLOAT4:
+        return "F";
+      case FLOAT8:
+        return "D";
+      case NUMERIC:
+        return "E";
+      case CHAR:
+      case TEXT:
+        return "C";
+      case DATE:
+      case TIME:
+      case TIMESTAMP:
+        return "T";
+      case BLOB:
+      case INET4:
+      case INET6:
+        return "A";
+      default:
+        throw new UnsupportedException(val.type().name());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java
new file mode 100644
index 0000000..29a5aa2
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+
+public interface FieldSerializerDeserializer {
+
+  int serialize(OutputStream out, Datum datum, Column col, int columnIndex, 
byte[] nullChars) throws IOException;
+
+  Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf 
nullChars)
+      throws IOException, TextLineParsingError;
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSplitProcessor.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSplitProcessor.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSplitProcessor.java
new file mode 100644
index 0000000..2e89d39
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSplitProcessor.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import io.netty.buffer.ByteBufProcessor;
+
+public class FieldSplitProcessor implements ByteBufProcessor {
+  private byte delimiter; //the ascii separate character
+
+  public FieldSplitProcessor(byte recordDelimiterByte) {
+    this.delimiter = recordDelimiterByte;
+  }
+
+  @Override
+  public boolean process(byte value) throws Exception {
+    return delimiter != value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java
new file mode 100644
index 0000000..dcc53c1
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * {@link InputHandler} is responsible for handling the input to the 
Tajo-Streaming external command.
+ *
+ */
+public class InputHandler implements Closeable {
+
+  private final static byte[] END_OF_RECORD_DELIM = "|_\n".getBytes();
+  private final static byte[] END_OF_STREAM = ("C" + "\\x04" + 
"|_\n").getBytes();
+
+  private final TextLineSerializer serializer;
+
+  private OutputStream out;
+
+  // flag to mark if close() has already been called
+  private boolean alreadyClosed = false;
+
+  public InputHandler(TextLineSerializer serializer) {
+    this.serializer = serializer;
+  }
+
+  /**
+   * Send the given input <code>Tuple</code> to the managed executable.
+   *
+   * @param t input <code>Tuple</code>
+   * @throws IOException
+   */
+  public void putNext(Tuple t) throws IOException {
+    serializer.serialize(out, t);
+    out.write(END_OF_RECORD_DELIM);
+  }
+
+  public void close() throws IOException {
+    if (!alreadyClosed) {
+      out.flush();
+      out.close();
+      out = null;
+      alreadyClosed = true;
+    }
+  }
+
+  /**
+   * Bind the <code>InputHandler</code> to the <code>OutputStream</code>
+   * from which it reads input and sends it to the managed process.
+   *
+   * @param os <code>OutputStream</code> from which to read input data for the
+   *           managed process
+   * @throws IOException
+   */
+  public void bindTo(OutputStream os) throws IOException {
+    out = os;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/LineSplitProcessor.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/LineSplitProcessor.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/LineSplitProcessor.java
new file mode 100644
index 0000000..0415141
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/LineSplitProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import io.netty.buffer.ByteBufProcessor;
+
+public class LineSplitProcessor implements ByteBufProcessor {
+  public static final byte CR = '\r';
+  public static final byte LF = '\n';
+  private boolean prevCharCR = false; //true of prev char was CR
+
+  @Override
+  public boolean process(byte value) throws Exception {
+    switch (value) {
+      case LF:
+        return false;
+      case CR:
+        prevCharCR = true;
+        return false;
+      default:
+        prevCharCR = false;
+        return true;
+    }
+  }
+
+  public boolean isPrevCharCR() {
+    return prevCharCR;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java
new file mode 100644
index 0000000..c5eb419
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java
@@ -0,0 +1,156 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import com.google.common.base.Charsets;
+import io.netty.buffer.ByteBuf;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * {@link OutputHandler} is responsible for handling the output of the
+ * Tajo-Streaming external command.
+ */
+public class OutputHandler implements Closeable {
+  private static int DEFAULT_BUFFER = 64 * 1024;
+  private final static byte[] END_OF_RECORD_DELIM = "|_\n".getBytes();
+
+  private final TextLineDeserializer deserializer;
+
+  private ByteBufLineReader in = null;
+
+  private String currValue = null;
+
+  private InputStream istream;
+
+  private final ByteBuf buf = BufferPool.directBuffer(DEFAULT_BUFFER);
+
+  // Both of these ignore the trailing "\n".  So if the default delimiter is 
"\n", recordDelimStr is "".
+  private String recordDelimStr = null;
+  private int recordDelimLength = 0;
+  private final Tuple tuple = new VTuple(1);
+
+  // flag to mark if close() has already been called
+  private boolean alreadyClosed = false;
+
+  public OutputHandler(TextLineDeserializer deserializer) {
+    this.deserializer = deserializer;
+  }
+
+  /**
+   * Bind the <code>OutputHandler</code> to the <code>InputStream</code>
+   * from which to read the output data of the managed process.
+   *
+   * @param is <code>InputStream</code> from which to read the output data
+   *           of the managed process
+   * @throws IOException
+   */
+  public void bindTo(InputStream is) throws IOException {
+    this.istream  = is;
+    this.in = new ByteBufLineReader(new ByteBufInputChannel(istream));
+  }
+
+  /**
+   * Get the next output <code>Tuple</code> of the managed process.
+   *
+   * @return the next output <code>Tuple</code> of the managed process
+   * @throws IOException
+   */
+  public Tuple getNext() throws IOException {
+    if (in == null) {
+      return null;
+    }
+
+    currValue = null;
+    if (!readValue()) {
+      return null;
+    }
+    buf.clear();
+    buf.writeBytes(currValue.getBytes());
+    try {
+      deserializer.deserialize(buf, tuple);
+    } catch (TextLineParsingError textLineParsingError) {
+      throw new IOException(textLineParsingError);
+    }
+    return tuple;
+  }
+
+  private boolean readValue() throws IOException {
+    currValue = in.readLine();
+    if (currValue == null) {
+      return false;
+    }
+
+    while(!isEndOfRow()) {
+      // Need to add back the newline character we ate.
+      currValue += '\n';
+
+      byte[] lineBytes = readNextLine();
+      if (lineBytes == null) {
+        // We have no more input, so just break;
+        break;
+      }
+      currValue += new String(lineBytes);
+    }
+
+    if (currValue.contains("|_")) {
+      int pos = currValue.lastIndexOf("|_");
+      currValue = currValue.substring(0, pos);
+    }
+
+    return true;
+  }
+
+  private byte[] readNextLine() throws IOException {
+    String line = in.readLine();
+    if (line == null) {
+      return null;
+    }
+
+    return line.getBytes();
+  }
+
+  private boolean isEndOfRow() {
+    if (recordDelimStr == null) {
+      byte[] recordDelimBa = END_OF_RECORD_DELIM;
+      recordDelimLength = recordDelimBa.length - 1; //Ignore trailing \n
+      recordDelimStr = new String(recordDelimBa, 0, recordDelimLength,  
Charsets.UTF_8);
+    }
+    if (recordDelimLength == 0 || currValue.length() < recordDelimLength) {
+      return true;
+    }
+    return currValue.contains(recordDelimStr);
+  }
+
+  /**
+   * Close the <code>OutputHandler</code>.
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    if(!alreadyClosed) {
+      istream.close();
+      istream = null;
+      alreadyClosed = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java
new file mode 100644
index 0000000..9bb60dd
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java
@@ -0,0 +1,91 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import org.apache.tajo.util.TUtil;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+public class StreamingUtil {
+
+  private static final String BASH = "bash";
+  private static final String PATH = "PATH";
+
+  /**
+   * Create an external process for StreamingCommand command.
+   *
+   * @param argv process arguments
+   * @return
+   */
+  public static ProcessBuilder createProcess(String[] argv) {
+    // Set the actual command to run with 'bash -c exec ...'
+    List<String> cmdArgs = TUtil.newList();
+
+    StringBuffer argBuffer = new StringBuffer();
+    for (String arg : argv) {
+      argBuffer.append(arg);
+      argBuffer.append(" ");
+    }
+    String argvAsString = argBuffer.toString();
+
+    if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
+      cmdArgs.add("cmd");
+      cmdArgs.add("/c");
+      cmdArgs.add(argvAsString);
+    } else {
+      cmdArgs.add(BASH);
+      cmdArgs.add("-c");
+      StringBuffer sb = new StringBuffer();
+      sb.append("exec ");
+      sb.append(argvAsString);
+      cmdArgs.add(sb.toString());
+    }
+
+    // Start the external process
+    ProcessBuilder processBuilder = new ProcessBuilder(cmdArgs
+        .toArray(new String[cmdArgs.size()]));
+    setupEnvironment(processBuilder);
+    return processBuilder;
+  }
+
+  /**
+   * Set up the run-time environment of the managed process.
+   *
+   * @param pb {@link ProcessBuilder} used to exec the process
+   */
+  private static void setupEnvironment(ProcessBuilder pb) {
+    String separator = ":";
+    Map<String, String> env = pb.environment();
+
+    // Add the current-working-directory to the $PATH
+    File dir = pb.directory();
+    String cwd = (dir != null) ? dir.getAbsolutePath() : System
+        .getProperty("user.dir");
+
+    String envPath = env.get(PATH);
+    if (envPath == null) {
+      envPath = cwd;
+    } else {
+      envPath = envPath + separator + cwd;
+    }
+    env.put(PATH, envPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java
new file mode 100644
index 0000000..cd9518b
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import com.google.protobuf.Message;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.CharsetUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.NumberUtil;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.CharsetDecoder;
+import java.util.TimeZone;
+
+public class TextFieldSerializerDeserializer implements 
FieldSerializerDeserializer {
+  public static final byte[] trueBytes = "true".getBytes();
+  public static final byte[] falseBytes = "false".getBytes();
+  private static ProtobufJsonFormat protobufJsonFormat = 
ProtobufJsonFormat.getInstance();
+  private final CharsetDecoder decoder = 
CharsetUtil.getDecoder(CharsetUtil.UTF_8);
+
+  private final boolean hasTimezone;
+  private final TimeZone timezone;
+
+  public TextFieldSerializerDeserializer(TableMeta meta) {
+    hasTimezone = meta.containsOption(StorageConstants.TIMEZONE);
+    timezone = TimeZone.getTimeZone(meta.getOption(StorageConstants.TIMEZONE, 
TajoConstants.DEFAULT_SYSTEM_TIMEZONE));
+  }
+
+  private static boolean isNull(ByteBuf val, ByteBuf nullBytes) {
+    return !val.isReadable() || nullBytes.equals(val);
+  }
+
+  private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) {
+    return val.readableBytes() > 0 && nullBytes.equals(val);
+  }
+
+  @Override
+  public int serialize(OutputStream out, Datum datum, Column col, int 
columnIndex, byte[] nullChars)
+      throws IOException {
+    byte[] bytes;
+    int length = 0;
+    TajoDataTypes.DataType dataType = col.getDataType();
+
+    if (datum == null || datum instanceof NullDatum) {
+      switch (dataType.getType()) {
+        case CHAR:
+        case TEXT:
+          length = nullChars.length;
+          out.write(nullChars);
+          break;
+        default:
+          break;
+      }
+      return length;
+    }
+
+    switch (dataType.getType()) {
+      case BOOLEAN:
+        out.write(datum.asBool() ? trueBytes : falseBytes);
+        length = trueBytes.length;
+        break;
+      case CHAR:
+        byte[] pad = new byte[dataType.getLength() - datum.size()];
+        bytes = datum.asTextBytes();
+        out.write(bytes);
+        out.write(pad);
+        length = bytes.length + pad.length;
+        break;
+      case TEXT:
+      case BIT:
+      case INT2:
+      case INT4:
+      case INT8:
+      case FLOAT4:
+      case FLOAT8:
+      case INET4:
+      case DATE:
+      case INTERVAL:
+        bytes = datum.asTextBytes();
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case TIME:
+        if (hasTimezone) {
+          bytes = ((TimeDatum) datum).asChars(timezone, true).getBytes();
+        } else {
+          bytes = datum.asTextBytes();
+        }
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case TIMESTAMP:
+        if (hasTimezone) {
+          bytes = ((TimestampDatum) datum).asChars(timezone, true).getBytes();
+        } else {
+          bytes = datum.asTextBytes();
+        }
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case INET6:
+      case BLOB:
+        bytes = Base64.encodeBase64(datum.asByteArray(), false);
+        length = bytes.length;
+        out.write(bytes, 0, length);
+        break;
+      case PROTOBUF:
+        ProtobufDatum protobuf = (ProtobufDatum) datum;
+        byte[] protoBytes = 
protobufJsonFormat.printToString(protobuf.get()).getBytes();
+        length = protoBytes.length;
+        out.write(protoBytes, 0, protoBytes.length);
+        break;
+      case NULL_TYPE:
+        break;
+      case ANY:
+        AnyDatum anyDatum = (AnyDatum) datum;
+        length = serialize(out, anyDatum.getActual(), new Column("any", 
anyDatum.getActual().type()), 0, nullChars);
+        break;
+      default:
+        throw new UnsupportedException(dataType.getType().name());
+    }
+    return length;
+  }
+
+  @Override
+  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf 
nullChars) throws IOException {
+    Datum datum;
+    TajoDataTypes.Type type = col.getDataType().getType();
+    boolean nullField;
+    if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) {
+      nullField = isNullText(buf, nullChars);
+    } else {
+      nullField = isNull(buf, nullChars);
+    }
+
+    if (nullField) {
+      datum = NullDatum.get();
+    } else {
+      switch (type) {
+        case BOOLEAN:
+          byte bool = buf.readByte();
+          datum = DatumFactory.createBool(bool == 't' || bool == 'T');
+          break;
+        case BIT:
+          datum = DatumFactory.createBit(Byte.parseByte(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), 
buf.readableBytes())).toString()));
+          break;
+        case CHAR:
+          datum = DatumFactory.createChar(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), 
buf.readableBytes())).toString().trim());
+          break;
+        case INT1:
+        case INT2:
+          datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf));
+          break;
+        case INT4:
+          datum = DatumFactory.createInt4(NumberUtil.parseInt(buf));
+          break;
+        case INT8:
+          datum = DatumFactory.createInt8(NumberUtil.parseLong(buf));
+          break;
+        case FLOAT4:
+          datum = DatumFactory.createFloat4(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), 
buf.readableBytes())).toString());
+          break;
+        case FLOAT8:
+          datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf));
+          break;
+        case TEXT: {
+          byte[] bytes = new byte[buf.readableBytes()];
+          buf.readBytes(bytes);
+          datum = DatumFactory.createText(bytes);
+          break;
+        }
+        case DATE:
+          datum = DatumFactory.createDate(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), 
buf.readableBytes())).toString());
+          break;
+        case TIME:
+          if (hasTimezone) {
+            datum = DatumFactory.createTime(
+                decoder.decode(buf.nioBuffer(buf.readerIndex(), 
buf.readableBytes())).toString(), timezone);
+          } else {
+            datum = DatumFactory.createTime(
+                decoder.decode(buf.nioBuffer(buf.readerIndex(), 
buf.readableBytes())).toString());
+          }
+          break;
+        case TIMESTAMP:
+          if (hasTimezone) {
+            datum = DatumFactory.createTimestamp(
+                decoder.decode(buf.nioBuffer(buf.readerIndex(), 
buf.readableBytes())).toString(), timezone);
+          } else {
+            datum = DatumFactory.createTimestamp(
+                decoder.decode(buf.nioBuffer(buf.readerIndex(), 
buf.readableBytes())).toString());
+          }
+          break;
+        case INTERVAL:
+          datum = DatumFactory.createInterval(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), 
buf.readableBytes())).toString());
+          break;
+        case PROTOBUF: {
+          ProtobufDatumFactory factory = 
ProtobufDatumFactory.get(col.getDataType());
+          Message.Builder builder = factory.newBuilder();
+          try {
+            byte[] bytes = new byte[buf.readableBytes()];
+            buf.readBytes(bytes);
+            protobufJsonFormat.merge(bytes, builder);
+            datum = factory.createDatum(builder.build());
+          } catch (IOException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+          break;
+        }
+        case INET4:
+          datum = DatumFactory.createInet4(
+              decoder.decode(buf.nioBuffer(buf.readerIndex(), 
buf.readableBytes())).toString());
+          break;
+        case BLOB: {
+          byte[] bytes = new byte[buf.readableBytes()];
+          buf.readBytes(bytes);
+          datum = DatumFactory.createBlob(Base64.decodeBase64(bytes));
+          break;
+        }
+        default:
+          datum = NullDatum.get();
+          break;
+      }
+    }
+    return datum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java
new file mode 100644
index 0000000..c2ea30c
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+/**
+ * Reads a text line and fills a Tuple with values
+ */
+public abstract class TextLineDeserializer {
+  protected final Schema schema;
+  protected final TableMeta meta;
+  protected final int[] targetColumnIndexes;
+
+  public TextLineDeserializer(Schema schema, TableMeta meta, int[] 
targetColumnIndexes) {
+    this.schema = schema;
+    this.meta = meta;
+    this.targetColumnIndexes = targetColumnIndexes;
+  }
+
+  /**
+   * Initialize SerDe
+   */
+  public abstract void init();
+
+  /**
+   * It fills a tuple with a read fields in a given line.
+   *
+   * @param buf Read line
+   * @param output Tuple to be filled with read fields
+   * @throws IOException
+   */
+  public abstract void deserialize(final ByteBuf buf, Tuple output) throws 
IOException, TextLineParsingError;
+
+  /**
+   * Release external resources
+   */
+  public abstract void release();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineParsingError.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineParsingError.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineParsingError.java
new file mode 100644
index 0000000..9048e27
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineParsingError.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+public class TextLineParsingError extends Exception {
+
+  public TextLineParsingError(Throwable t) {
+    super(t);
+  }
+
+  public TextLineParsingError(String message, Throwable t) {
+    super(t.getMessage() + ", Error line: " + message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java
new file mode 100644
index 0000000..89b169c
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.Bytes;
+
+/**
+ * Pluggable Text Line SerDe class
+ */
+public abstract class TextLineSerDe {
+
+  public TextLineSerDe() {
+  }
+
+  public abstract TextLineDeserializer createDeserializer(Schema schema, 
TableMeta meta, int [] targetColumnIndexes);
+
+  public abstract TextLineSerializer createSerializer(Schema schema, TableMeta 
meta);
+
+  public static ByteBuf getNullChars(TableMeta meta) {
+    byte[] nullCharByteArray = getNullCharsAsBytes(meta);
+
+    ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, 
nullCharByteArray.length);
+    nullChars.writeBytes(nullCharByteArray);
+
+    return nullChars;
+  }
+
+  public static byte [] getNullCharsAsBytes(TableMeta meta) {
+    byte [] nullChars;
+
+    String nullCharacters = 
StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
+        NullDatum.DEFAULT_TEXT));
+    if (StringUtils.isEmpty(nullCharacters)) {
+      nullChars = NullDatum.get().asTextBytes();
+    } else {
+      nullChars = nullCharacters.getBytes(Bytes.UTF8_CHARSET);
+    }
+
+    return nullChars;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java
new file mode 100644
index 0000000..ea8576c
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function.stream;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Write a Tuple into single text formatted line
+ */
+public abstract class TextLineSerializer {
+  protected Schema schema;
+  protected TableMeta meta;
+
+  public TextLineSerializer(Schema schema, TableMeta meta) {
+    this.schema = schema;
+    this.meta = meta;
+  }
+
+  public abstract void init();
+
+  public abstract int serialize(OutputStream out, Tuple input) throws 
IOException;
+
+  public abstract void release();
+}

Reply via email to