Repository: tajo Updated Branches: refs/heads/index_support e6b91c333 -> 86c97b2a1
http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java deleted file mode 100644 index 6b2c116..0000000 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.plan.function; - -import com.google.gson.annotations.Expose; -import org.apache.tajo.catalog.FunctionDesc; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.exception.InternalException; -import org.apache.tajo.plan.expr.EvalContext; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.util.TUtil; - -/** - * This class invokes the legacy scala functions. - */ -public class LegacyScalarFunctionInvoke extends FunctionInvoke implements Cloneable { - @Expose private GeneralFunction function; - - public LegacyScalarFunctionInvoke() { - - } - - public LegacyScalarFunctionInvoke(FunctionDesc funcDesc) throws InternalException { - super(funcDesc); - function = (GeneralFunction) funcDesc.newInstance(); - } - - @Override - public void setFunctionDesc(FunctionDesc desc) throws InternalException { - super.setFunctionDesc(desc); - function = (GeneralFunction) functionDesc.newInstance(); - } - - @Override - public void init(FunctionInvokeContext context) { - function.init(context.getQueryContext(), context.getParamTypes()); - } - - @Override - public Datum eval(Tuple tuple) { - return function.eval(tuple); - } - - @Override - public boolean equals(Object o) { - if (o instanceof LegacyScalarFunctionInvoke) { - LegacyScalarFunctionInvoke other = (LegacyScalarFunctionInvoke) o; - return super.equals(other) && - TUtil.checkEquals(function, other.function); - } - return false; - } - - @Override - public int hashCode() { - return function.hashCode(); - } - - @Override - public Object clone() throws CloneNotSupportedException { - LegacyScalarFunctionInvoke clone = (LegacyScalarFunctionInvoke) super.clone(); - clone.function = (GeneralFunction) function.clone(); - return clone; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonAggFunctionInvoke.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonAggFunctionInvoke.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonAggFunctionInvoke.java new file mode 100644 index 0000000..c7fddf9 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonAggFunctionInvoke.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function; + +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.FunctionDesc; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.plan.function.python.PythonScriptEngine; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +public class PythonAggFunctionInvoke extends AggFunctionInvoke implements Cloneable { + + private transient PythonScriptEngine scriptEngine; + private transient PythonAggFunctionContext prevContext; + private static int nextContextId = 0; + + /** + * Aggregated result should be kept in Tajo task rather than Python UDAF to control memory usage. + * {@link PythonAggFunctionContext} is to support executing aggregation with keys. + * It stores a snapshot of Python UDAF class instance as a json string. + * + * For each UDAF call with different aggregation key, + * {@link PythonAggFunctionInvoke} calls {@link PythonAggFunctionInvoke#updateContextIfNecessary} to backup and restore + * intermediate aggregation states for the previous key and the current key, respectively. + */ + public static class PythonAggFunctionContext implements FunctionContext { + final int id; // id to identify each context + String jsonData; // snapshot of Python class + + public PythonAggFunctionContext() { + this.id = nextContextId++; + } + + public void setJsonData(String jsonData) { + this.jsonData = jsonData; + } + + public String getJsonData() { + return jsonData; + } + } + + public PythonAggFunctionInvoke(FunctionDesc functionDesc) { + super(functionDesc); + } + + @Override + public void init(FunctionInvokeContext context) throws IOException { + this.scriptEngine = (PythonScriptEngine) context.getScriptEngine(); + } + + @Override + public FunctionContext newContext() { + return new PythonAggFunctionContext(); + } + + /** + * Context does not need to be updated per every UDAF call. + * If the current aggregation key is same with the previous one, + * python-side context doesn't need to be updated because it already contains necessary intermediate result. + * + * @param context + */ + private void updateContextIfNecessary(FunctionContext context) { + PythonAggFunctionContext givenContext = (PythonAggFunctionContext) context; + if (prevContext == null || prevContext.id != givenContext.id) { + try { + if (prevContext != null) { + scriptEngine.updateJavaSideContext(prevContext); + } + scriptEngine.updatePythonSideContext(givenContext); + prevContext = givenContext; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void eval(FunctionContext context, Tuple params) { + updateContextIfNecessary(context); + scriptEngine.callAggFunc(context, params); + } + + @Override + public void merge(FunctionContext context, Tuple params) { + if (params.get(0) instanceof NullDatum) { + return; + } + + updateContextIfNecessary(context); + scriptEngine.callAggFunc(context, params); + } + + @Override + public Datum getPartialResult(FunctionContext context) { + updateContextIfNecessary(context); + // partial results are stored as json strings. + return DatumFactory.createText(scriptEngine.getPartialResult(context)); + } + + @Override + public TajoDataTypes.DataType getPartialResultType() { + return CatalogUtil.newSimpleDataType(TajoDataTypes.Type.TEXT); + } + + @Override + public Datum terminate(FunctionContext context) { + updateContextIfNecessary(context); + return scriptEngine.getFinalResult(context); + } + + @Override + public Object clone() throws CloneNotSupportedException { + // nothing to do + return super.clone(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java index 1019c60..342bd98 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java @@ -47,7 +47,7 @@ public class PythonFunctionInvoke extends FunctionInvoke implements Cloneable { @Override public Datum eval(Tuple tuple) { - Datum res = scriptEngine.eval(tuple); + Datum res = scriptEngine.callScalarFunc(tuple); return res; } http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 0da30f1..878553f 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -27,10 +27,9 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; -import org.apache.tajo.function.FunctionInvocation; -import org.apache.tajo.function.FunctionSignature; -import org.apache.tajo.function.FunctionSupplement; -import org.apache.tajo.function.PythonInvocationDesc; +import org.apache.tajo.function.*; +import org.apache.tajo.plan.function.FunctionContext; +import org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext; import org.apache.tajo.plan.function.stream.*; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; @@ -70,25 +69,47 @@ public class PythonScriptEngine extends TajoScriptEngine { Set<FunctionDesc> functionDescs = TUtil.newHashSet(); InputStream in = getScriptAsStream(path); - List<FuncInfo> functions = null; + List<FunctionInfo> functions = null; try { functions = getFunctions(in); } finally { in.close(); } - for(FuncInfo funcInfo : functions) { - TajoDataTypes.DataType returnType = CatalogUtil.newSimpleDataType(TajoDataTypes.Type.valueOf(funcInfo.returnType)); - FunctionSignature signature = new FunctionSignature(CatalogProtos.FunctionType.UDF, funcInfo.funcName, - returnType, createParamTypes(funcInfo.paramNum)); + for(FunctionInfo funcInfo : functions) { + FunctionSignature signature; FunctionInvocation invocation = new FunctionInvocation(); - PythonInvocationDesc invocationDesc = new PythonInvocationDesc(funcInfo.funcName, path.getPath()); - invocation.setPython(invocationDesc); FunctionSupplement supplement = new FunctionSupplement(); - functionDescs.add(new FunctionDesc(signature, invocation, supplement)); + if (funcInfo instanceof ScalarFuncInfo) { + ScalarFuncInfo scalarFuncInfo = (ScalarFuncInfo) funcInfo; + TajoDataTypes.DataType returnType = getReturnTypes(scalarFuncInfo)[0]; + signature = new FunctionSignature(CatalogProtos.FunctionType.UDF, scalarFuncInfo.funcName, + returnType, createParamTypes(scalarFuncInfo.paramNum)); + PythonInvocationDesc invocationDesc = new PythonInvocationDesc(scalarFuncInfo.funcName, path.getPath(), true); + invocation.setPython(invocationDesc); + functionDescs.add(new FunctionDesc(signature, invocation, supplement)); + } else { + AggFuncInfo aggFuncInfo = (AggFuncInfo) funcInfo; + if (isValidUdaf(aggFuncInfo)) { + TajoDataTypes.DataType returnType = getReturnTypes(aggFuncInfo.getFinalResultInfo)[0]; + signature = new FunctionSignature(CatalogProtos.FunctionType.UDA, aggFuncInfo.funcName, + returnType, createParamTypes(aggFuncInfo.evalInfo.paramNum)); + PythonInvocationDesc invocationDesc = new PythonInvocationDesc(aggFuncInfo.className, path.getPath(), false); + invocation.setPythonAggregation(invocationDesc); + functionDescs.add(new FunctionDesc(signature, invocation, supplement)); + } + } } return functionDescs; } + private static boolean isValidUdaf(AggFuncInfo aggFuncInfo) { + if (aggFuncInfo.className != null && aggFuncInfo.evalInfo != null && aggFuncInfo.mergeInfo != null + && aggFuncInfo.getPartialResultInfo != null && aggFuncInfo.getFinalResultInfo != null) { + return true; + } + return false; + } + private static TajoDataTypes.DataType[] createParamTypes(int paramNum) { TajoDataTypes.DataType[] paramTypes = new TajoDataTypes.DataType[paramNum]; for (int i = 0; i < paramNum; i++) { @@ -97,34 +118,63 @@ public class PythonScriptEngine extends TajoScriptEngine { return paramTypes; } - private static final Pattern pSchema = Pattern.compile("^\\s*\\W+outputType.*"); + private static TajoDataTypes.DataType[] getReturnTypes(ScalarFuncInfo scalarFuncInfo) { + TajoDataTypes.DataType[] returnTypes = new TajoDataTypes.DataType[scalarFuncInfo.returnTypes.length]; + for (int i = 0; i < scalarFuncInfo.returnTypes.length; i++) { + returnTypes[i] = CatalogUtil.newSimpleDataType(TajoDataTypes.Type.valueOf(scalarFuncInfo.returnTypes[i])); + } + return returnTypes; + } + + private static final Pattern pSchema = Pattern.compile("^\\s*\\W+output_type.*"); private static final Pattern pDef = Pattern.compile("^\\s*def\\s+(\\w+)\\s*.+"); + private static final Pattern pClass = Pattern.compile("class.*"); - private static class FuncInfo { - String returnType; + private interface FunctionInfo { + + } + + private static class AggFuncInfo implements FunctionInfo { + String className; + String funcName; + ScalarFuncInfo evalInfo; + ScalarFuncInfo mergeInfo; + ScalarFuncInfo getPartialResultInfo; + ScalarFuncInfo getFinalResultInfo; + } + + private static class ScalarFuncInfo implements FunctionInfo { + String[] returnTypes; String funcName; int paramNum; - public FuncInfo(String returnType, String funcName, int paramNum) { - this.returnType = returnType.toUpperCase(); + public ScalarFuncInfo(String[] quotedSchemaStrings, String funcName, int paramNum) { + this.returnTypes = new String[quotedSchemaStrings.length]; + for (int i = 0; i < quotedSchemaStrings.length; i++) { + String quotedString = quotedSchemaStrings[i].trim(); + String[] tokens = quotedString.substring(1, quotedString.length()-1).split(":"); + this.returnTypes[i] = tokens.length == 1 ? tokens[0].toUpperCase() : tokens[1].toUpperCase(); + } this.funcName = funcName; this.paramNum = paramNum; } } // TODO: python parser must be improved. - private static List<FuncInfo> getFunctions(InputStream is) throws IOException { - List<FuncInfo> functions = TUtil.newList(); + private static List<FunctionInfo> getFunctions(InputStream is) throws IOException { + List<FunctionInfo> functions = TUtil.newList(); InputStreamReader in = new InputStreamReader(is, Charset.defaultCharset()); BufferedReader br = new BufferedReader(in); String line = br.readLine(); - String schemaString = null; + String[] quotedSchemaStrings = null; + AggFuncInfo aggFuncInfo = null; while (line != null) { if (pSchema.matcher(line).matches()) { - int start = line.indexOf("(") + 2; //drop brackets/quotes - int end = line.lastIndexOf(")") - 1; - schemaString = line.substring(start,end).trim(); + int start = line.indexOf("(") + 1; //drop brackets + int end = line.lastIndexOf(")"); + quotedSchemaStrings = line.substring(start,end).trim().split(","); } else if (pDef.matcher(line).matches()) { + boolean isUdaf = aggFuncInfo != null && line.indexOf("def ") > 0; int nameStart = line.indexOf("def ") + "def ".length(); int nameEnd = line.indexOf('('); int signatureEnd = line.indexOf(')'); @@ -135,14 +185,48 @@ public class PythonScriptEngine extends TajoScriptEngine { } else { paramNum = params.length; } + if (params[0].trim().equals("self")) { + paramNum--; + } String functionName = line.substring(nameStart, nameEnd).trim(); - schemaString = schemaString == null ? "blob" : schemaString; - functions.add(new FuncInfo(schemaString, functionName, paramNum)); - schemaString = null; + quotedSchemaStrings = quotedSchemaStrings == null ? new String[] {"'blob'"} : quotedSchemaStrings; + + if (isUdaf) { + if (functionName.equals("eval")) { + aggFuncInfo.evalInfo = new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum); + } else if (functionName.equals("merge")) { + aggFuncInfo.mergeInfo = new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum); + } else if (functionName.equals("get_partial_result")) { + aggFuncInfo.getPartialResultInfo = new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum); + } else if (functionName.equals("get_final_result")) { + aggFuncInfo.getFinalResultInfo = new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum); + } + } else { + aggFuncInfo = null; + functions.add(new ScalarFuncInfo(quotedSchemaStrings, functionName, paramNum)); + } + + quotedSchemaStrings = null; + } else if (pClass.matcher(line).matches()) { + // UDAF + if (aggFuncInfo != null) { + functions.add(aggFuncInfo); + } + aggFuncInfo = new AggFuncInfo(); + int classNameStart = line.indexOf("class ") + "class ".length(); + int classNameEnd = line.indexOf("("); + if (classNameEnd < 0) { + classNameEnd = line.indexOf(":"); + } + aggFuncInfo.className = line.substring(classNameStart, classNameEnd).trim(); + aggFuncInfo.funcName = aggFuncInfo.className.toLowerCase(); } line = br.readLine(); } + if (aggFuncInfo != null) { + functions.add(aggFuncInfo); + } br.close(); in.close(); return functions; @@ -162,12 +246,13 @@ public class PythonScriptEngine extends TajoScriptEngine { private static final int PATH_TO_CONTROLLER_FILE = 1; private static final int UDF_FILE_NAME = 2; // Name of file where UDF function is defined private static final int UDF_FILE_PATH = 3; // Path to directory containing file where UDF function is defined - private static final int UDF_NAME = 4; // Name of UDF function being called. - private static final int PATH_TO_FILE_CACHE = 5; // Directory where required files (like tajo_util) are cached on cluster nodes. - private static final int STD_OUT_OUTPUT_PATH = 6; // File for output from when user writes to standard output. - private static final int STD_ERR_OUTPUT_PATH = 7; // File for output from when user writes to standard error. - private static final int CONTROLLER_LOG_FILE_PATH = 8; // Controller log file logs progress through the controller script not user code. - private static final int OUT_SCHEMA = 9; // the schema of the output column + private static final int PATH_TO_FILE_CACHE = 4; // Directory where required files (like tajo_util) are cached on cluster nodes. + private static final int STD_OUT_OUTPUT_PATH = 5; // File for output from when user writes to standard output. + private static final int STD_ERR_OUTPUT_PATH = 6; // File for output from when user writes to standard error. + private static final int CONTROLLER_LOG_FILE_PATH = 7; // Controller log file logs progress through the controller script not user code. + private static final int OUT_SCHEMA = 8; // the schema of the output column + private static final int FUNCTION_OR_CLASS_NAME = 9; // if FUNCTION_TYPE is UDF, function name; if FUNCTION_TYPE is UDAF, class name. + private static final int FUNCTION_TYPE = 10; // UDF or UDAF private Configuration systemConf; @@ -182,36 +267,42 @@ public class PythonScriptEngine extends TajoScriptEngine { private final FunctionSignature functionSignature; private final PythonInvocationDesc invocationDesc; - private final Schema inSchema; - private final Schema outSchema; - private final int [] projectionCols = new int[]{0}; + private Schema inSchema; + private Schema outSchema; + private int[] projectionCols; private final CSVLineSerDe lineSerDe = new CSVLineSerDe(); private final TableMeta pipeMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.TEXTFILE); - private static final Tuple EMPTY_INPUT = new VTuple(0); + private final Tuple EMPTY_INPUT = new VTuple(0); + private final Schema EMPTY_SCHEMA = new Schema(); public PythonScriptEngine(FunctionDesc functionDesc) { - if (!functionDesc.getInvocation().hasPython()) { + if (!functionDesc.getInvocation().hasPython() && !functionDesc.getInvocation().hasPythonAggregation()) { throw new IllegalStateException("Function type must be 'python'"); } functionSignature = functionDesc.getSignature(); invocationDesc = functionDesc.getInvocation().getPython(); + setSchema(); + } - TajoDataTypes.DataType[] paramTypes = functionSignature.getParamTypes(); - inSchema = new Schema(); - for (int i = 0; i < paramTypes.length; i++) { - inSchema.addColumn(new Column("in_" + i, paramTypes[i])); + public PythonScriptEngine(FunctionDesc functionDesc, boolean intermediatePhase, boolean finalPhase) { + if (!functionDesc.getInvocation().hasPython() && !functionDesc.getInvocation().hasPythonAggregation()) { + throw new IllegalStateException("Function type must be 'python'"); } - outSchema = new Schema(new Column[]{new Column("out", functionSignature.getReturnType())}); + functionSignature = functionDesc.getSignature(); + invocationDesc = functionDesc.getInvocation().getPython(); + this.intermediatePhase = intermediatePhase; + this.finalPhase = finalPhase; + setSchema(); } @Override public void start(Configuration systemConf) throws IOException { this.systemConf = systemConf; startUdfController(); - createInputHandlers(); setStreams(); + createInputHandlers(); if (LOG.isDebugEnabled()) { LOG.debug("PythonScriptExecutor starts up"); } @@ -220,11 +311,7 @@ public class PythonScriptEngine extends TajoScriptEngine { @Override public void shutdown() { process.destroy(); - FileUtil.cleanup(LOG, stdin); - FileUtil.cleanup(LOG, stdout); - FileUtil.cleanup(LOG, stderr); - FileUtil.cleanup(LOG, inputHandler); - FileUtil.cleanup(LOG, outputHandler); + FileUtil.cleanup(LOG, stdin, stdout, stderr, inputHandler, outputHandler); stdin = null; stdout = stderr = null; inputHandler = null; @@ -245,7 +332,7 @@ public class PythonScriptEngine extends TajoScriptEngine { * @throws IOException */ private String[] buildCommand() throws IOException { - String[] command = new String[10]; + String[] command = new String[11]; // TODO: support controller logging String standardOutputRootWriteLocation = systemConf.get(TajoConf.ConfVars.PYTHON_CONTROLLER_LOG_DIR.keyname(), @@ -269,7 +356,6 @@ public class PythonScriptEngine extends TajoScriptEngine { fileName = fileName.endsWith(FILE_EXTENSION) ? fileName.substring(0, fileName.length()-3) : fileName; command[UDF_FILE_NAME] = fileName; command[UDF_FILE_PATH] = lastSeparator <= 0 ? "." : filePath.substring(0, lastSeparator - 1); - command[UDF_NAME] = funcName; String fileCachePath = systemConf.get(TajoConf.ConfVars.PYTHON_CODE_DIR.keyname()); if (fileCachePath == null) { throw new IOException(TajoConf.ConfVars.PYTHON_CODE_DIR.keyname() + " must be set."); @@ -279,17 +365,53 @@ public class PythonScriptEngine extends TajoScriptEngine { command[STD_ERR_OUTPUT_PATH] = errOutFileName; command[CONTROLLER_LOG_FILE_PATH] = controllerLogFileName; command[OUT_SCHEMA] = outSchema.getColumn(0).getDataType().getType().name().toLowerCase(); + command[FUNCTION_OR_CLASS_NAME] = funcName; + command[FUNCTION_TYPE] = invocationDesc.isScalarFunction() ? "UDF" : "UDAF"; return command; } - private void createInputHandlers() { - TextLineSerializer serializer = lineSerDe.createSerializer(inSchema, pipeMeta); + private void setSchema() { + if (invocationDesc.isScalarFunction()) { + TajoDataTypes.DataType[] paramTypes = functionSignature.getParamTypes(); + inSchema = new Schema(); + for (int i = 0; i < paramTypes.length; i++) { + inSchema.addColumn(new Column("in_" + i, paramTypes[i])); + } + outSchema = new Schema(new Column[]{new Column("out", functionSignature.getReturnType())}); + } else { + // UDAF + if (!intermediatePhase && !finalPhase) { + // first phase + TajoDataTypes.DataType[] paramTypes = functionSignature.getParamTypes(); + inSchema = new Schema(); + for (int i = 0; i < paramTypes.length; i++) { + inSchema.addColumn(new Column("in_" + i, paramTypes[i])); + } + outSchema = new Schema(new Column[]{new Column("json", TajoDataTypes.Type.TEXT)}); + } else if (intermediatePhase) { + inSchema = outSchema = new Schema(new Column[]{new Column("json", TajoDataTypes.Type.TEXT)}); + } else if (finalPhase) { + inSchema = new Schema(new Column[]{new Column("json", TajoDataTypes.Type.TEXT)}); + outSchema = new Schema(new Column[]{new Column("out", functionSignature.getReturnType())}); + } + } + projectionCols = new int[outSchema.size()]; + for (int i = 0; i < outSchema.size(); i++) { + projectionCols[i] = i; + } + } + + private void createInputHandlers() throws IOException { + setSchema(); + TextLineSerializer serializer = lineSerDe.createSerializer(pipeMeta); serializer.init(); this.inputHandler = new InputHandler(serializer); + inputHandler.bindTo(stdin); TextLineDeserializer deserializer = lineSerDe.createDeserializer(outSchema, pipeMeta, projectionCols); deserializer.init(); this.outputHandler = new OutputHandler(deserializer); + outputHandler.bindTo(stdout); } /** @@ -297,13 +419,9 @@ public class PythonScriptEngine extends TajoScriptEngine { * * @throws IOException */ - private void setStreams() throws IOException { + private void setStreams() { stdout = new DataInputStream(new BufferedInputStream(process.getInputStream())); - outputHandler.bindTo(stdout); - stdin = new DataOutputStream(new BufferedOutputStream(process.getOutputStream())); - inputHandler.bindTo(stdin); - stderr = new DataInputStream(new BufferedInputStream(process.getErrorStream())); } @@ -342,16 +460,133 @@ public class PythonScriptEngine extends TajoScriptEngine { return controllerPath; } - public Datum eval(Tuple input) { + /** + * Call Python scalar functions. + * + * @param input input tuple + * @return evaluated result datum + */ + @Override + public Datum callScalarFunc(Tuple input) { try { - if (input == null) { - // When nothing is passed into the UDF the tuple - // being sent is the full tuple for the relation. - // We want it to be nothing (since that's what the user wrote). - input = EMPTY_INPUT; - } + inputHandler.putNext(input, inSchema); + stdin.flush(); + } catch (Exception e) { + throw new RuntimeException("Failed adding input to inputQueue", e); + } + Datum result; + try { + result = outputHandler.getNext().get(0); + } catch (Exception e) { + throw new RuntimeException("Problem getting output", e); + } + + return result; + } + + /** + * Call Python aggregation functions. + * + * @param functionContext python function context + * @param input input tuple + */ + @Override + public void callAggFunc(FunctionContext functionContext, Tuple input) { + + String methodName; + if (!intermediatePhase && !finalPhase) { + // eval + methodName = "eval"; + } else { + // merge + methodName = "merge"; + } + + try { + inputHandler.putNext(methodName, input, inSchema); + stdin.flush(); + } catch (Exception e) { + throw new RuntimeException("Failed adding input to inputQueue while executing " + methodName + " with " + input, e); + } + + try { + outputHandler.getNext(); + } catch (Exception e) { + throw new RuntimeException("Problem getting output", e); + } + } + + /** + * Restore the intermediate result in Python UDAF with the snapshot stored in the function context. + * + * @param functionContext + */ + public void updatePythonSideContext(PythonAggFunctionContext functionContext) throws IOException { + + try { + inputHandler.putNext("update_context", functionContext); + stdin.flush(); + } catch (Exception e) { + throw new RuntimeException("Failed adding input to inputQueue", e); + } + try { + outputHandler.getNext(); + } catch (Exception e) { + throw new RuntimeException("Problem getting output", e); + } + } + + /** + * Get the snapshot of the intermediate result in the Python UDAF. + * + * @param functionContext + */ + public void updateJavaSideContext(PythonAggFunctionContext functionContext) throws IOException { + + try { + inputHandler.putNext("get_context", EMPTY_INPUT, EMPTY_SCHEMA); + stdin.flush(); + } catch (Exception e) { + throw new RuntimeException("Failed adding input to inputQueue", e); + } + try { + outputHandler.getNext(functionContext); + } catch (Exception e) { + throw new RuntimeException("Problem getting output", e); + } + } + + /** + * Get intermediate result after the first stage. + * + * @param functionContext + * @return + */ + @Override + public String getPartialResult(FunctionContext functionContext) { + try { + inputHandler.putNext("get_partial_result", EMPTY_INPUT, EMPTY_SCHEMA); + stdin.flush(); + } catch (Exception e) { + throw new RuntimeException("Failed adding input to inputQueue", e); + } + try { + return outputHandler.getPartialResultString(); + } catch (Exception e) { + throw new RuntimeException("Problem getting output", e); + } + } - inputHandler.putNext(input); + /** + * Get final result after the last stage. + * + * @param functionContext + * @return + */ + @Override + public Datum getFinalResult(FunctionContext functionContext) { + try { + inputHandler.putNext("get_final_result", EMPTY_INPUT, EMPTY_SCHEMA); stdin.flush(); } catch (Exception e) { throw new RuntimeException("Failed adding input to inputQueue", e); http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java index 726ec2f..c233fb8 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java @@ -19,7 +19,9 @@ package org.apache.tajo.plan.function.python; import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.datum.Datum; +import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.storage.Tuple; import java.io.*; @@ -30,6 +32,9 @@ import java.net.URI; */ public abstract class TajoScriptEngine { + protected boolean intermediatePhase = false; + protected boolean finalPhase = false; + /** * Open a stream load a script locally or in the classpath * @param scriptPath the path of the script @@ -79,5 +84,19 @@ public abstract class TajoScriptEngine { * @param input * @return */ - public abstract Datum eval(Tuple input); + public abstract Datum callScalarFunc(Tuple input); + + public abstract void callAggFunc(FunctionContext functionContext, Tuple input); + + public abstract String getPartialResult(FunctionContext functionContext); + + public abstract Datum getFinalResult(FunctionContext functionContext); + + public void setIntermediatePhase(boolean flag) { + this.intermediatePhase = flag; + } + + public void setFinalPhase(boolean flag) { + this.finalPhase = flag; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java index 0b3c625..9b65e4b 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java @@ -19,24 +19,75 @@ package org.apache.tajo.plan.function.stream; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.util.ResourceLeakDetector; import io.netty.util.internal.PlatformDependent; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.util.CommonTestingUtil; + +import java.lang.reflect.Field; /* this class is PooledBuffer holder */ public class BufferPool { - private static final PooledByteBufAllocator allocator; + public static final String ALLOW_CACHE = "tajo.storage.buffer.thread-local.cache"; + private static final ByteBufAllocator ALLOCATOR; private BufferPool() { } static { - //TODO we need determine the default params - allocator = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); + /* TODO Enable thread cache + * Create a pooled ByteBuf allocator but disables the thread-local cache. + * Because the TaskRunner thread is newly created + * */ + + if (System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { + /* Disable pooling buffers for memory usage */ + ALLOCATOR = UnpooledByteBufAllocator.DEFAULT; + + /* if you are finding memory leak, please enable this line */ + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + } else { + TajoConf tajoConf = new TajoConf(); + ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, false), 0); + } + } + + /** + * borrowed from Spark + */ + public static PooledByteBufAllocator createPooledByteBufAllocator( + boolean allowDirectBufs, + boolean allowCache, + int numCores) { + if (numCores == 0) { + numCores = Runtime.getRuntime().availableProcessors(); + } + return new PooledByteBufAllocator( + allowDirectBufs && PlatformDependent.directBufferPreferred(), + Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores), + Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0), + getPrivateStaticField("DEFAULT_PAGE_SIZE"), + getPrivateStaticField("DEFAULT_MAX_ORDER"), + allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0, + allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0, + allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0 + ); + } - /* if you are finding memory leak, please enable this line */ - //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + /** Used to get defaults from Netty's private static fields. */ + private static int getPrivateStaticField(String name) { + try { + Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name); + f.setAccessible(true); + return f.getInt(null); + } catch (Exception e) { + throw new RuntimeException(e); + } } public static long maxDirectMemory() { @@ -45,7 +96,7 @@ public class BufferPool { public static ByteBuf directBuffer(int size) { - return allocator.directBuffer(size); + return ALLOCATOR.directBuffer(size); } /** @@ -55,7 +106,7 @@ public class BufferPool { * @return allocated ByteBuf from pool */ public static ByteBuf directBuffer(int size, int max) { - return allocator.directBuffer(size, max); + return ALLOCATOR.directBuffer(size, max); } @InterfaceStability.Unstable http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java index 1969d90..93c1364 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java @@ -23,6 +23,8 @@ import io.netty.buffer.ByteBufProcessor; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.datum.Datum; +import org.apache.tajo.plan.function.FunctionContext; +import org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext; import org.apache.tajo.storage.Tuple; import java.io.IOException; @@ -51,6 +53,7 @@ public class CSVLineDeserializer extends TextLineDeserializer { fieldSerDer = new TextFieldSerializerDeserializer(meta); } + @Override public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError { int[] projection = targetColumnIndexes; if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) { @@ -75,7 +78,7 @@ public class CSVLineDeserializer extends TextLineDeserializer { if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { lineBuf.setIndex(start, start + fieldLength); - Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars); + Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex).getDataType(), nullChars); output.put(currentIndex, datum); currentTarget++; } @@ -90,6 +93,18 @@ public class CSVLineDeserializer extends TextLineDeserializer { } @Override + public void deserialize(final ByteBuf lineBuf, FunctionContext context) throws IOException, TextLineParsingError { + PythonAggFunctionContext pythonContext = (PythonAggFunctionContext) context; + if (lineBuf == null) { + return; + } + + byte[] bytes = new byte[lineBuf.readableBytes()]; + lineBuf.readBytes(bytes); + pythonContext.setJsonData(new String(bytes)); + } + + @Override public void release() { if (nullChars != null) { nullChars.release(); http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java index 566e5c9..e9da196 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java @@ -31,8 +31,8 @@ public class CSVLineSerDe extends TextLineSerDe { } @Override - public TextLineSerializer createSerializer(Schema schema, TableMeta meta) { - return new CSVLineSerializer(schema, meta); + public TextLineSerializer createSerializer(TableMeta meta) { + return new CSVLineSerializer(meta); } public static byte[] getFieldDelimiter(TableMeta meta) { http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java index e1c7375..9db6632 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java @@ -24,6 +24,8 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.AnyDatum; import org.apache.tajo.datum.Datum; import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.function.FunctionContext; +import org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext; import org.apache.tajo.storage.Tuple; import java.io.IOException; @@ -34,41 +36,29 @@ public class CSVLineSerializer extends TextLineSerializer { private byte[] nullChars; private byte[] delimiter; - private int columnNum; - private final static String PARAM_DELIM = "|\t_"; + public final static String PARAM_DELIM = "|\t_"; - public CSVLineSerializer(Schema schema, TableMeta meta) { - super(schema, meta); + public CSVLineSerializer(TableMeta meta) { + super(meta); } @Override public void init() { nullChars = TextLineSerDe.getNullCharsAsBytes(meta); delimiter = "|,_".getBytes(); - columnNum = schema.size(); serde = new TextFieldSerializerDeserializer(meta); } @Override - public int serialize(OutputStream out, Tuple input) throws IOException { + public int serialize(OutputStream out, Tuple input, Schema schema) throws IOException { int writtenBytes = 0; - for (int i = 0; i < columnNum; i++) { - Datum datum = input.get(i); - String typeStr; - if (datum.type() == TajoDataTypes.Type.ANY) { - typeStr = getTypeString(((AnyDatum)datum).getActual()); - } else { - typeStr = getTypeString(datum); - } - out.write(typeStr.getBytes()); - out.write(PARAM_DELIM.getBytes()); - - writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars); + for (int i = 0; i < input.size(); i++) { + writtenBytes += serializeDatum(out, input.get(i), schema.getColumn(i).getDataType()); - if (columnNum - 1 > i) { + if (input.size() - 1 > i) { out.write(delimiter); writtenBytes += delimiter.length; } @@ -77,12 +67,43 @@ public class CSVLineSerializer extends TextLineSerializer { return writtenBytes; } + private int serializeDatum(OutputStream out, Datum datum, TajoDataTypes.DataType dataType) throws IOException { + String typeStr; + if (datum.type() == TajoDataTypes.Type.ANY) { + typeStr = getTypeString(((AnyDatum)datum).getActual()); + } else { + typeStr = getTypeString(datum); + } + out.write(typeStr.getBytes()); + out.write(PARAM_DELIM.getBytes()); + + return serde.serialize(out, datum, dataType, nullChars); + } + + @Override + public int serializeContext(OutputStream out, FunctionContext context) throws IOException { + int writtenBytes = 0; + PythonAggFunctionContext pythonContext = (PythonAggFunctionContext) context; + + if (pythonContext.getJsonData() == null) { + byte[] bytes = "-".getBytes(); + out.write(bytes); + writtenBytes += bytes.length; + } else { + byte[] bytes = pythonContext.getJsonData().getBytes(); + out.write(bytes); + writtenBytes += bytes.length; + } + + return writtenBytes; + } + @Override public void release() { } - private static String getTypeString(Datum val) { + public static String getTypeString(Datum val) { switch (val.type()) { case NULL_TYPE: return "-"; http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java index 29a5aa2..340772e 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java @@ -20,6 +20,7 @@ package org.apache.tajo.plan.function.stream; import io.netty.buffer.ByteBuf; import org.apache.tajo.catalog.Column; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.Datum; import java.io.IOException; @@ -28,9 +29,10 @@ import java.io.OutputStream; public interface FieldSerializerDeserializer { - int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException; + int serialize(OutputStream out, Datum datum, TajoDataTypes.DataType dataType, byte[] nullChars) + throws IOException; - Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) + Datum deserialize(ByteBuf buf, TajoDataTypes.DataType dataType, ByteBuf nullChars) throws IOException, TextLineParsingError; } http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java index dcc53c1..fd2c3ee 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java @@ -18,6 +18,8 @@ package org.apache.tajo.plan.function.stream; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.plan.function.PythonAggFunctionInvoke; import org.apache.tajo.storage.Tuple; import java.io.Closeable; @@ -50,8 +52,21 @@ public class InputHandler implements Closeable { * @param t input <code>Tuple</code> * @throws IOException */ - public void putNext(Tuple t) throws IOException { - serializer.serialize(out, t); + public void putNext(Tuple t, Schema schema) throws IOException { + serializer.serialize(out, t, schema); + out.write(END_OF_RECORD_DELIM); + } + + public void putNext(String methodName, Tuple t, Schema schema) throws IOException { + String wrappedMethod = "|" + methodName + "_" + CSVLineSerializer.PARAM_DELIM; + out.write(wrappedMethod.getBytes()); + putNext(t, schema); + } + + public void putNext(String methodName, PythonAggFunctionInvoke.PythonAggFunctionContext context) throws IOException { + String wrappedMethod = "|" + methodName + "_" + CSVLineSerializer.PARAM_DELIM; + out.write(wrappedMethod.getBytes()); + serializer.serializeContext(out, context); out.write(END_OF_RECORD_DELIM); } http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java index c5eb419..1eeb508 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java @@ -20,8 +20,12 @@ package org.apache.tajo.plan.function.stream; import com.google.common.base.Charsets; import io.netty.buffer.ByteBuf; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.FileUtil; import java.io.Closeable; import java.io.IOException; @@ -32,6 +36,8 @@ import java.io.InputStream; * Tajo-Streaming external command. */ public class OutputHandler implements Closeable { + + private static final Log LOG = LogFactory.getLog(OutputHandler.class); private static int DEFAULT_BUFFER = 64 * 1024; private final static byte[] END_OF_RECORD_DELIM = "|_\n".getBytes(); @@ -43,18 +49,21 @@ public class OutputHandler implements Closeable { private InputStream istream; - private final ByteBuf buf = BufferPool.directBuffer(DEFAULT_BUFFER); + private ByteBuf buf; // Both of these ignore the trailing "\n". So if the default delimiter is "\n", recordDelimStr is "". private String recordDelimStr = null; private int recordDelimLength = 0; - private final Tuple tuple = new VTuple(1); + private final Tuple tuple; // flag to mark if close() has already been called private boolean alreadyClosed = false; + private final String FIELD_DELIM; public OutputHandler(TextLineDeserializer deserializer) { this.deserializer = deserializer; + FIELD_DELIM = new String(CSVLineSerDe.getFieldDelimiter(deserializer.meta)); + tuple = new VTuple(deserializer.schema.size()); } /** @@ -68,6 +77,7 @@ public class OutputHandler implements Closeable { public void bindTo(InputStream is) throws IOException { this.istream = is; this.in = new ByteBufLineReader(new ByteBufInputChannel(istream)); + this.buf = BufferPool.directBuffer(DEFAULT_BUFFER); } /** @@ -95,6 +105,38 @@ public class OutputHandler implements Closeable { return tuple; } + public String getPartialResultString() throws IOException { + if (in == null) { + return null; + } + + currValue = null; + if (!readValue()) { + return null; + } + return currValue; + } + + public FunctionContext getNext(FunctionContext context) throws IOException { + if (in == null) { + return null; + } + + currValue = null; + if (!readValue()) { + return null; + } + buf.clear(); + buf.writeBytes(currValue.getBytes()); + try { + deserializer.deserialize(buf, context); + } catch (TextLineParsingError textLineParsingError) { + throw new IOException(textLineParsingError); + } + + return context; + } + private boolean readValue() throws IOException { currValue = in.readLine(); if (currValue == null) { @@ -113,10 +155,25 @@ public class OutputHandler implements Closeable { currValue += new String(lineBytes); } - if (currValue.contains("|_")) { - int pos = currValue.lastIndexOf("|_"); - currValue = currValue.substring(0, pos); + String line = currValue; + + int candidate = -1; + StringBuffer sb = new StringBuffer(); + while ((candidate=line.indexOf("|")) >= 0) { + if (line.substring(candidate, candidate+2).equals("|_")) { + // record end + sb.append(line.substring(0, candidate)); + break; + } else if (line.substring(candidate, candidate+3).equals("|,_")) { + sb.append(line.substring(0, candidate)).append(FIELD_DELIM); + line = line.substring(candidate+3, line.length()); + } else if (line.substring(candidate, candidate+3).equals("|-_")) { + // null value + sb.append(FIELD_DELIM); + line = line.substring(candidate+3, line.length()); + } } + currValue = sb.toString(); return true; } @@ -148,9 +205,13 @@ public class OutputHandler implements Closeable { */ public void close() throws IOException { if(!alreadyClosed) { - istream.close(); + FileUtil.cleanup(LOG, istream, in); + in = null; istream = null; alreadyClosed = true; + if (this.buf.refCnt() > 0) { + this.buf.release(); + } } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java index 9bb60dd..46558c1 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java @@ -18,6 +18,8 @@ package org.apache.tajo.plan.function.stream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tajo.util.TUtil; import java.io.File; @@ -26,6 +28,8 @@ import java.util.Map; public class StreamingUtil { + private static final Log LOG = LogFactory.getLog(StreamingUtil.class); + private static final String BASH = "bash"; private static final String PATH = "PATH"; @@ -59,6 +63,14 @@ public class StreamingUtil { cmdArgs.add(sb.toString()); } + if (LOG.isDebugEnabled()) { + StringBuffer sb = new StringBuffer("command: "); + for (String cmd : cmdArgs) { + sb.append(cmd).append(" "); + } + LOG.debug(sb.toString()); + } + // Start the external process ProcessBuilder processBuilder = new ProcessBuilder(cmdArgs .toArray(new String[cmdArgs.size()])); http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java index cd9518b..b6d5020 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java @@ -23,7 +23,7 @@ import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; import org.apache.commons.codec.binary.Base64; import org.apache.tajo.TajoConstants; -import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.*; @@ -60,11 +60,10 @@ public class TextFieldSerializerDeserializer implements FieldSerializerDeseriali } @Override - public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) + public int serialize(OutputStream out, Datum datum, TajoDataTypes.DataType dataType, byte[] nullChars) throws IOException { byte[] bytes; int length = 0; - TajoDataTypes.DataType dataType = col.getDataType(); if (datum == null || datum instanceof NullDatum) { switch (dataType.getType()) { @@ -139,7 +138,8 @@ public class TextFieldSerializerDeserializer implements FieldSerializerDeseriali break; case ANY: AnyDatum anyDatum = (AnyDatum) datum; - length = serialize(out, anyDatum.getActual(), new Column("any", anyDatum.getActual().type()), 0, nullChars); + length = serialize(out, anyDatum.getActual(), CatalogUtil.newSimpleDataType(anyDatum.getActual().type()), + nullChars); break; default: throw new UnsupportedException(dataType.getType().name()); @@ -148,9 +148,9 @@ public class TextFieldSerializerDeserializer implements FieldSerializerDeseriali } @Override - public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException { + public Datum deserialize(ByteBuf buf, TajoDataTypes.DataType dataType, ByteBuf nullChars) throws IOException { Datum datum; - TajoDataTypes.Type type = col.getDataType().getType(); + TajoDataTypes.Type type = dataType.getType(); boolean nullField; if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) { nullField = isNullText(buf, nullChars); @@ -224,7 +224,7 @@ public class TextFieldSerializerDeserializer implements FieldSerializerDeseriali decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); break; case PROTOBUF: { - ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(dataType); Message.Builder builder = factory.newBuilder(); try { byte[] bytes = new byte[buf.readableBytes()]; http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java index c2ea30c..19f414f 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java @@ -21,6 +21,7 @@ package org.apache.tajo.plan.function.stream; import io.netty.buffer.ByteBuf; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.storage.Tuple; import java.io.IOException; @@ -53,6 +54,8 @@ public abstract class TextLineDeserializer { */ public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException, TextLineParsingError; + public abstract void deserialize(final ByteBuf buf, FunctionContext context) throws IOException, TextLineParsingError; + /** * Release external resources */ http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java index 89b169c..8cd3e56 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java @@ -37,7 +37,7 @@ public abstract class TextLineSerDe { public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes); - public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta); + public abstract TextLineSerializer createSerializer(TableMeta meta); public static ByteBuf getNullChars(TableMeta meta) { byte[] nullCharByteArray = getNullCharsAsBytes(meta); http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java index ea8576c..71fdd23 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java @@ -20,6 +20,7 @@ package org.apache.tajo.plan.function.stream; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.storage.Tuple; import java.io.IOException; @@ -29,17 +30,17 @@ import java.io.OutputStream; * Write a Tuple into single text formatted line */ public abstract class TextLineSerializer { - protected Schema schema; protected TableMeta meta; - public TextLineSerializer(Schema schema, TableMeta meta) { - this.schema = schema; + public TextLineSerializer(TableMeta meta) { this.meta = meta; } public abstract void init(); - public abstract int serialize(OutputStream out, Tuple input) throws IOException; + public abstract int serialize(OutputStream out, Tuple input, Schema schema) throws IOException; + + public abstract int serializeContext(OutputStream out, FunctionContext context) throws IOException; public abstract void release(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/9540f16e/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java index 3ca76ee..80c18cc 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java @@ -186,10 +186,9 @@ public class EvalNodeDeserializer { evalContext.addScriptEngine(current, new PythonScriptEngine(funcDesc)); } } else if (type == EvalType.AGG_FUNCTION || type == EvalType.WINDOW_FUNCTION) { - AggFunction instance = (AggFunction) funcDesc.newInstance(); if (type == EvalType.AGG_FUNCTION) { AggregationFunctionCallEval aggFunc = - new AggregationFunctionCallEval(new FunctionDesc(funcProto.getFuncion()), instance, params); + new AggregationFunctionCallEval(new FunctionDesc(funcProto.getFuncion()), params); PlanProto.AggFunctionEvalSpec aggFunctionProto = protoNode.getAggFunction(); aggFunc.setIntermediatePhase(aggFunctionProto.getIntermediatePhase()); @@ -199,11 +198,16 @@ public class EvalNodeDeserializer { } current = aggFunc; + if (evalContext != null && funcDesc.getInvocation().hasPythonAggregation()) { + evalContext.addScriptEngine(current, new PythonScriptEngine(funcDesc, + aggFunc.isIntermediatePhase(), aggFunc.isFinalPhase())); + } + } else { WinFunctionEvalSpec windowFuncProto = protoNode.getWinFunction(); WindowFunctionEval winFunc = - new WindowFunctionEval(new FunctionDesc(funcProto.getFuncion()), instance, params, + new WindowFunctionEval(new FunctionDesc(funcProto.getFuncion()), params, convertWindowFrame(windowFuncProto.getWindowFrame())); if (windowFuncProto.getSortSpecCount() > 0) {
