http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvoke.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvoke.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvoke.java new file mode 100644 index 0000000..728ae10 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvoke.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function; + +import com.google.gson.annotations.Expose; +import org.apache.tajo.catalog.FunctionDesc; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.exception.InternalException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.expr.EvalContext; +import org.apache.tajo.storage.Tuple; + +import java.io.Closeable; +import java.io.IOException; + +/** + * An abstract class for actual function invocation. + * The metadata for function invocation are stored in the {@link org.apache.tajo.function.FunctionInvocation} class. + */ +public abstract class FunctionInvoke implements Cloneable { + @Expose protected FunctionDesc functionDesc; + + public FunctionInvoke() { + + } + + public FunctionInvoke(FunctionDesc functionDesc) { + this.functionDesc = functionDesc; + } + + public static FunctionInvoke newInstance(FunctionDesc desc) throws InternalException { + if (desc.getInvocation().hasLegacy()) { + return new LegacyScalarFunctionInvoke(desc); + } else if (desc.getInvocation().hasPython()) { + return new PythonFunctionInvoke(desc); + } else { + throw new UnsupportedException(desc.getInvocation() + " is not supported"); + } + } + + public void setFunctionDesc(FunctionDesc functionDesc) throws InternalException { + this.functionDesc = functionDesc; + } + + public abstract void init(FunctionInvokeContext context) throws IOException; + + /** + * Evaluate the given tuple with a function + * @param tuple a tuple evaluated with parameters + * @return a result of a fuction execution + */ + public abstract Datum eval(Tuple tuple); + + @Override + public boolean equals(Object o) { + if (o instanceof FunctionInvoke) { + FunctionInvoke other = (FunctionInvoke) o; + return this.functionDesc.equals(other.functionDesc); + } + return false; + } + + @Override + public int hashCode() { + return functionDesc.hashCode(); + } + + @Override + public Object clone() throws CloneNotSupportedException { + FunctionInvoke clone = (FunctionInvoke) super.clone(); + clone.functionDesc = (FunctionDesc) this.functionDesc.clone(); + return clone; + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvokeContext.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvokeContext.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvokeContext.java new file mode 100644 index 0000000..b938072 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvokeContext.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function; + +import com.google.common.base.Objects; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.plan.expr.FunctionEval; +import org.apache.tajo.plan.function.python.TajoScriptEngine; + +import java.util.Arrays; + +/** + * This class contains some metadata need to execute functions. + */ +public class FunctionInvokeContext { + private final OverridableConf queryContext; + private final FunctionEval.ParamType[] paramTypes; + private TajoScriptEngine scriptEngine; + + public FunctionInvokeContext(OverridableConf queryContext, FunctionEval.ParamType[] paramTypes) { + this.queryContext = queryContext; + this.paramTypes = paramTypes; + } + + public OverridableConf getQueryContext() { + return queryContext; + } + + public FunctionEval.ParamType[] getParamTypes() { + return paramTypes; + } + + public void setScriptEngine(TajoScriptEngine scriptEngine) { + this.scriptEngine = scriptEngine; + } + + public boolean hasScriptExecutor() { + return scriptEngine != null; + } + + public TajoScriptEngine getScriptEngine() { + return scriptEngine; + } + + @Override + public int hashCode() { + return Objects.hashCode(queryContext, Arrays.hashCode(paramTypes)); + } + + @Override + public boolean equals(Object o) { + if (o instanceof FunctionInvokeContext) { + FunctionInvokeContext other = (FunctionInvokeContext) o; + return queryContext.equals(other.queryContext) && Arrays.equals(paramTypes, other.paramTypes); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java new file mode 100644 index 0000000..6b2c116 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/LegacyScalarFunctionInvoke.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function; + +import com.google.gson.annotations.Expose; +import org.apache.tajo.catalog.FunctionDesc; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.exception.InternalException; +import org.apache.tajo.plan.expr.EvalContext; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.util.TUtil; + +/** + * This class invokes the legacy scala functions. + */ +public class LegacyScalarFunctionInvoke extends FunctionInvoke implements Cloneable { + @Expose private GeneralFunction function; + + public LegacyScalarFunctionInvoke() { + + } + + public LegacyScalarFunctionInvoke(FunctionDesc funcDesc) throws InternalException { + super(funcDesc); + function = (GeneralFunction) funcDesc.newInstance(); + } + + @Override + public void setFunctionDesc(FunctionDesc desc) throws InternalException { + super.setFunctionDesc(desc); + function = (GeneralFunction) functionDesc.newInstance(); + } + + @Override + public void init(FunctionInvokeContext context) { + function.init(context.getQueryContext(), context.getParamTypes()); + } + + @Override + public Datum eval(Tuple tuple) { + return function.eval(tuple); + } + + @Override + public boolean equals(Object o) { + if (o instanceof LegacyScalarFunctionInvoke) { + LegacyScalarFunctionInvoke other = (LegacyScalarFunctionInvoke) o; + return super.equals(other) && + TUtil.checkEquals(function, other.function); + } + return false; + } + + @Override + public int hashCode() { + return function.hashCode(); + } + + @Override + public Object clone() throws CloneNotSupportedException { + LegacyScalarFunctionInvoke clone = (LegacyScalarFunctionInvoke) super.clone(); + clone.function = (GeneralFunction) function.clone(); + return clone; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java new file mode 100644 index 0000000..1019c60 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/PythonFunctionInvoke.java @@ -0,0 +1,59 @@ +/* + * Lisensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function; + +import org.apache.tajo.catalog.FunctionDesc; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.plan.function.python.PythonScriptEngine; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +/** + * This class invokes the python functions. + */ +public class PythonFunctionInvoke extends FunctionInvoke implements Cloneable { + + private transient PythonScriptEngine scriptEngine; + + public PythonFunctionInvoke() { + + } + + public PythonFunctionInvoke(FunctionDesc functionDesc) { + super(functionDesc); + } + + @Override + public void init(FunctionInvokeContext context) throws IOException { + this.scriptEngine = (PythonScriptEngine) context.getScriptEngine(); + } + + @Override + public Datum eval(Tuple tuple) { + Datum res = scriptEngine.eval(tuple); + return res; + } + + @Override + public Object clone() throws CloneNotSupportedException { + // nothing to do + return super.clone(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java new file mode 100644 index 0000000..0da30f1 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.python; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.function.FunctionInvocation; +import org.apache.tajo.function.FunctionSignature; +import org.apache.tajo.function.FunctionSupplement; +import org.apache.tajo.function.PythonInvocationDesc; +import org.apache.tajo.plan.function.stream.*; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.TUtil; + +import java.io.*; +import java.net.URI; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * {@link PythonScriptEngine} is responsible for registering python functions and maintaining the controller process. + * The controller is a python process that executes the python UDFs. + * (Please refer to 'tajo-core/src/main/resources/python/controller.py') + * Data are exchanged via standard I/O between PythonScriptEngine and the controller. + */ +public class PythonScriptEngine extends TajoScriptEngine { + + private static final Log LOG = LogFactory.getLog(PythonScriptEngine.class); + + public static final String FILE_EXTENSION = ".py"; + + /** + * Register functions defined in a python script + * + * @param path path to the python script file + * @param namespace namespace where the functions will be defined + * @return set of function descriptions + * @throws IOException + */ + public static Set<FunctionDesc> registerFunctions(URI path, String namespace) throws IOException { + // TODO: we should support the namespace for python functions. + + Set<FunctionDesc> functionDescs = TUtil.newHashSet(); + + InputStream in = getScriptAsStream(path); + List<FuncInfo> functions = null; + try { + functions = getFunctions(in); + } finally { + in.close(); + } + for(FuncInfo funcInfo : functions) { + TajoDataTypes.DataType returnType = CatalogUtil.newSimpleDataType(TajoDataTypes.Type.valueOf(funcInfo.returnType)); + FunctionSignature signature = new FunctionSignature(CatalogProtos.FunctionType.UDF, funcInfo.funcName, + returnType, createParamTypes(funcInfo.paramNum)); + FunctionInvocation invocation = new FunctionInvocation(); + PythonInvocationDesc invocationDesc = new PythonInvocationDesc(funcInfo.funcName, path.getPath()); + invocation.setPython(invocationDesc); + FunctionSupplement supplement = new FunctionSupplement(); + functionDescs.add(new FunctionDesc(signature, invocation, supplement)); + } + return functionDescs; + } + + private static TajoDataTypes.DataType[] createParamTypes(int paramNum) { + TajoDataTypes.DataType[] paramTypes = new TajoDataTypes.DataType[paramNum]; + for (int i = 0; i < paramNum; i++) { + paramTypes[i] = TajoDataTypes.DataType.newBuilder().setType(TajoDataTypes.Type.ANY).build(); + } + return paramTypes; + } + + private static final Pattern pSchema = Pattern.compile("^\\s*\\W+outputType.*"); + private static final Pattern pDef = Pattern.compile("^\\s*def\\s+(\\w+)\\s*.+"); + + private static class FuncInfo { + String returnType; + String funcName; + int paramNum; + + public FuncInfo(String returnType, String funcName, int paramNum) { + this.returnType = returnType.toUpperCase(); + this.funcName = funcName; + this.paramNum = paramNum; + } + } + + // TODO: python parser must be improved. + private static List<FuncInfo> getFunctions(InputStream is) throws IOException { + List<FuncInfo> functions = TUtil.newList(); + InputStreamReader in = new InputStreamReader(is, Charset.defaultCharset()); + BufferedReader br = new BufferedReader(in); + String line = br.readLine(); + String schemaString = null; + while (line != null) { + if (pSchema.matcher(line).matches()) { + int start = line.indexOf("(") + 2; //drop brackets/quotes + int end = line.lastIndexOf(")") - 1; + schemaString = line.substring(start,end).trim(); + } else if (pDef.matcher(line).matches()) { + int nameStart = line.indexOf("def ") + "def ".length(); + int nameEnd = line.indexOf('('); + int signatureEnd = line.indexOf(')'); + String[] params = line.substring(nameEnd+1, signatureEnd).split(","); + int paramNum; + if (params.length == 1) { + paramNum = params[0].equals("") ? 0 : 1; + } else { + paramNum = params.length; + } + + String functionName = line.substring(nameStart, nameEnd).trim(); + schemaString = schemaString == null ? "blob" : schemaString; + functions.add(new FuncInfo(schemaString, functionName, paramNum)); + schemaString = null; + } + line = br.readLine(); + } + br.close(); + in.close(); + return functions; + } + + + private static final String PYTHON_LANGUAGE = "python"; + private static final String PYTHON_ROOT_PATH = "/python"; + private static final String TAJO_UTIL_NAME = "tajo_util.py"; + private static final String CONTROLLER_NAME = "controller.py"; + private static final String PYTHON_CONTROLLER_JAR_PATH = PYTHON_ROOT_PATH + File.separator + CONTROLLER_NAME; // Relative to root of tajo jar. + private static final String PYTHON_TAJO_UTIL_PATH = PYTHON_ROOT_PATH + File.separator + TAJO_UTIL_NAME; // Relative to root of tajo jar. + private static final String DEFAULT_LOG_DIR = "/tmp/tajo-" + System.getProperty("user.name") + "/python"; + + // Indexes for arguments being passed to external process + private static final int UDF_LANGUAGE = 0; + private static final int PATH_TO_CONTROLLER_FILE = 1; + private static final int UDF_FILE_NAME = 2; // Name of file where UDF function is defined + private static final int UDF_FILE_PATH = 3; // Path to directory containing file where UDF function is defined + private static final int UDF_NAME = 4; // Name of UDF function being called. + private static final int PATH_TO_FILE_CACHE = 5; // Directory where required files (like tajo_util) are cached on cluster nodes. + private static final int STD_OUT_OUTPUT_PATH = 6; // File for output from when user writes to standard output. + private static final int STD_ERR_OUTPUT_PATH = 7; // File for output from when user writes to standard error. + private static final int CONTROLLER_LOG_FILE_PATH = 8; // Controller log file logs progress through the controller script not user code. + private static final int OUT_SCHEMA = 9; // the schema of the output column + + private Configuration systemConf; + + private Process process; // Handle to the external execution of python functions + + private InputHandler inputHandler; + private OutputHandler outputHandler; + + private DataOutputStream stdin; // stdin of the process + private InputStream stdout; // stdout of the process + private InputStream stderr; // stderr of the process + + private final FunctionSignature functionSignature; + private final PythonInvocationDesc invocationDesc; + private final Schema inSchema; + private final Schema outSchema; + private final int [] projectionCols = new int[]{0}; + + private final CSVLineSerDe lineSerDe = new CSVLineSerDe(); + private final TableMeta pipeMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.TEXTFILE); + + private static final Tuple EMPTY_INPUT = new VTuple(0); + + public PythonScriptEngine(FunctionDesc functionDesc) { + if (!functionDesc.getInvocation().hasPython()) { + throw new IllegalStateException("Function type must be 'python'"); + } + functionSignature = functionDesc.getSignature(); + invocationDesc = functionDesc.getInvocation().getPython(); + + TajoDataTypes.DataType[] paramTypes = functionSignature.getParamTypes(); + inSchema = new Schema(); + for (int i = 0; i < paramTypes.length; i++) { + inSchema.addColumn(new Column("in_" + i, paramTypes[i])); + } + outSchema = new Schema(new Column[]{new Column("out", functionSignature.getReturnType())}); + } + + @Override + public void start(Configuration systemConf) throws IOException { + this.systemConf = systemConf; + startUdfController(); + createInputHandlers(); + setStreams(); + if (LOG.isDebugEnabled()) { + LOG.debug("PythonScriptExecutor starts up"); + } + } + + @Override + public void shutdown() { + process.destroy(); + FileUtil.cleanup(LOG, stdin); + FileUtil.cleanup(LOG, stdout); + FileUtil.cleanup(LOG, stderr); + FileUtil.cleanup(LOG, inputHandler); + FileUtil.cleanup(LOG, outputHandler); + stdin = null; + stdout = stderr = null; + inputHandler = null; + outputHandler = null; + if (LOG.isDebugEnabled()) { + LOG.debug("PythonScriptExecutor shuts down"); + } + } + + private void startUdfController() throws IOException { + ProcessBuilder processBuilder = StreamingUtil.createProcess(buildCommand()); + process = processBuilder.start(); + } + + /** + * Build a command to execute an external process. + * @return + * @throws IOException + */ + private String[] buildCommand() throws IOException { + String[] command = new String[10]; + + // TODO: support controller logging + String standardOutputRootWriteLocation = systemConf.get(TajoConf.ConfVars.PYTHON_CONTROLLER_LOG_DIR.keyname(), + DEFAULT_LOG_DIR); + if (!standardOutputRootWriteLocation.equals(DEFAULT_LOG_DIR)) { + LOG.warn("Currently, logging is not supported for the python controller."); + } + String controllerLogFileName, outFileName, errOutFileName; + + String funcName = invocationDesc.getName(); + String filePath = invocationDesc.getPath(); + + controllerLogFileName = standardOutputRootWriteLocation + funcName + "_controller.log"; + outFileName = standardOutputRootWriteLocation + funcName + ".out"; + errOutFileName = standardOutputRootWriteLocation + funcName + ".err"; + + command[UDF_LANGUAGE] = PYTHON_LANGUAGE; + command[PATH_TO_CONTROLLER_FILE] = getControllerPath(); + int lastSeparator = filePath.lastIndexOf(File.separator) + 1; + String fileName = filePath.substring(lastSeparator); + fileName = fileName.endsWith(FILE_EXTENSION) ? fileName.substring(0, fileName.length()-3) : fileName; + command[UDF_FILE_NAME] = fileName; + command[UDF_FILE_PATH] = lastSeparator <= 0 ? "." : filePath.substring(0, lastSeparator - 1); + command[UDF_NAME] = funcName; + String fileCachePath = systemConf.get(TajoConf.ConfVars.PYTHON_CODE_DIR.keyname()); + if (fileCachePath == null) { + throw new IOException(TajoConf.ConfVars.PYTHON_CODE_DIR.keyname() + " must be set."); + } + command[PATH_TO_FILE_CACHE] = "'" + fileCachePath + "'"; + command[STD_OUT_OUTPUT_PATH] = outFileName; + command[STD_ERR_OUTPUT_PATH] = errOutFileName; + command[CONTROLLER_LOG_FILE_PATH] = controllerLogFileName; + command[OUT_SCHEMA] = outSchema.getColumn(0).getDataType().getType().name().toLowerCase(); + + return command; + } + + private void createInputHandlers() { + TextLineSerializer serializer = lineSerDe.createSerializer(inSchema, pipeMeta); + serializer.init(); + this.inputHandler = new InputHandler(serializer); + TextLineDeserializer deserializer = lineSerDe.createDeserializer(outSchema, pipeMeta, projectionCols); + deserializer.init(); + this.outputHandler = new OutputHandler(deserializer); + } + + /** + * Get the standard input, output, and error streams of the external process + * + * @throws IOException + */ + private void setStreams() throws IOException { + stdout = new DataInputStream(new BufferedInputStream(process.getInputStream())); + outputHandler.bindTo(stdout); + + stdin = new DataOutputStream(new BufferedOutputStream(process.getOutputStream())); + inputHandler.bindTo(stdin); + + stderr = new DataInputStream(new BufferedInputStream(process.getErrorStream())); + } + + /** + * Find the path to the controller file for the streaming language. + * + * First check path to job jar and if the file is not found (like in the + * case of running hadoop in standalone mode) write the necessary files + * to temporary files and return that path. + * + * @return + * @throws IOException + */ + private String getControllerPath() throws IOException { + String controllerPath = PYTHON_CONTROLLER_JAR_PATH; + File controller = new File(PYTHON_CONTROLLER_JAR_PATH); + if (!controller.exists()) { + File controllerFile = File.createTempFile("controller", FILE_EXTENSION); + InputStream pythonControllerStream = this.getClass().getResourceAsStream(PYTHON_CONTROLLER_JAR_PATH); + try { + FileUtils.copyInputStreamToFile(pythonControllerStream, controllerFile); + } finally { + pythonControllerStream.close(); + } + controllerFile.deleteOnExit(); + File tajoUtilFile = new File(controllerFile.getParent() + File.separator + TAJO_UTIL_NAME); + tajoUtilFile.deleteOnExit(); + InputStream pythonUtilStream = this.getClass().getResourceAsStream(PYTHON_TAJO_UTIL_PATH); + try { + FileUtils.copyInputStreamToFile(pythonUtilStream, tajoUtilFile); + } finally { + pythonUtilStream.close(); + } + controllerPath = controllerFile.getAbsolutePath(); + } + return controllerPath; + } + + public Datum eval(Tuple input) { + try { + if (input == null) { + // When nothing is passed into the UDF the tuple + // being sent is the full tuple for the relation. + // We want it to be nothing (since that's what the user wrote). + input = EMPTY_INPUT; + } + + inputHandler.putNext(input); + stdin.flush(); + } catch (Exception e) { + throw new RuntimeException("Failed adding input to inputQueue", e); + } + Datum result; + try { + result = outputHandler.getNext().get(0); + } catch (Exception e) { + throw new RuntimeException("Problem getting output", e); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java new file mode 100644 index 0000000..726ec2f --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/TajoScriptEngine.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.python; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.Tuple; + +import java.io.*; +import java.net.URI; + +/** + * Abstract class of script engine + */ +public abstract class TajoScriptEngine { + + /** + * Open a stream load a script locally or in the classpath + * @param scriptPath the path of the script + * @return a stream (it is the responsibility of the caller to close it) + * @throws IllegalStateException if we could not open a stream + */ + protected static InputStream getScriptAsStream(URI scriptPath) { + InputStream is = null; + File file; + try { + file = new File(scriptPath); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("path: " + scriptPath, e); + } + if (file.exists()) { + try { + is = new FileInputStream(file); + } catch (FileNotFoundException e) { + throw new IllegalStateException("could not find existing file "+scriptPath, e); + } + } + + if (is == null) { + throw new IllegalStateException( + "Could not initialize interpreter (from file system or classpath) with " + scriptPath); + } + return is; + } + + /** + * Start TajoScriptEngine. + * + * @param systemConf + * @throws IOException + */ + public abstract void start(Configuration systemConf) throws IOException; + + /** + * Shutdown TajoScriptEngine. + * @throws IOException + */ + public abstract void shutdown(); + + /** + * Evaluate the input tuple. + * + * @param input + * @return + */ + public abstract Datum eval(Tuple input); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java new file mode 100644 index 0000000..0b3c625 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.internal.PlatformDependent; +import org.apache.hadoop.classification.InterfaceStability; + +/* this class is PooledBuffer holder */ +public class BufferPool { + + private static final PooledByteBufAllocator allocator; + + private BufferPool() { + } + + static { + //TODO we need determine the default params + allocator = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); + + /* if you are finding memory leak, please enable this line */ + //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + } + + public static long maxDirectMemory() { + return PlatformDependent.maxDirectMemory(); + } + + + public static ByteBuf directBuffer(int size) { + return allocator.directBuffer(size); + } + + /** + * + * @param size the initial capacity + * @param max the max capacity + * @return allocated ByteBuf from pool + */ + public static ByteBuf directBuffer(int size, int max) { + return allocator.directBuffer(size, max); + } + + @InterfaceStability.Unstable + public static void forceRelease(ByteBuf buf) { + buf.release(buf.refCnt()); + } + + /** + * the ByteBuf will increase to writable size + * @param buf + * @param minWritableBytes required minimum writable size + */ + public static void ensureWritable(ByteBuf buf, int minWritableBytes) { + buf.ensureWritable(minWritableBytes); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufInputChannel.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufInputChannel.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufInputChannel.java new file mode 100644 index 0000000..daf2357 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufInputChannel.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.tajo.util.FileUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.channels.spi.AbstractInterruptibleChannel; + +public class ByteBufInputChannel extends AbstractInterruptibleChannel implements ScatteringByteChannel { + + ByteBufferReadable byteBufferReadable; + ReadableByteChannel channel; + InputStream inputStream; + + public ByteBufInputChannel(InputStream inputStream) { + if (inputStream instanceof ByteBufferReadable) { + this.byteBufferReadable = (ByteBufferReadable) inputStream; + } else { + this.channel = Channels.newChannel(inputStream); + } + + this.inputStream = inputStream; + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public long read(ByteBuffer[] dsts) { + return read(dsts, 0, dsts.length); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + if (byteBufferReadable != null) { + return byteBufferReadable.read(dst); + } else { + return channel.read(dst); + } + } + + @Override + protected void implCloseChannel() throws IOException { + FileUtil.cleanup(null, channel, inputStream); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufLineReader.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufLineReader.java new file mode 100644 index 0000000..f100931 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/ByteBufLineReader.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +public class ByteBufLineReader implements Closeable { + private static int DEFAULT_BUFFER = 64 * 1024; + + private int bufferSize; + private long readBytes; + private int startIndex; + private boolean eof = false; + private ByteBuf buffer; + private final ByteBufInputChannel channel; + private final AtomicInteger lineReadBytes = new AtomicInteger(); + private final LineSplitProcessor processor = new LineSplitProcessor(); + + public ByteBufLineReader(ByteBufInputChannel channel) { + this(channel, BufferPool.directBuffer(DEFAULT_BUFFER)); + } + + public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) { + this.readBytes = 0; + this.channel = channel; + this.buffer = buf; + this.bufferSize = buf.capacity(); + } + + public long readBytes() { + return readBytes - buffer.readableBytes(); + } + + @Override + public void close() throws IOException { + if (this.buffer.refCnt() > 0) { + this.buffer.release(); + } + this.channel.close(); + } + + public String readLine() throws IOException { + ByteBuf buf = readLineBuf(lineReadBytes); + if (buf != null) { + return buf.toString(CharsetUtil.UTF_8); + } + return null; + } + + private void fillBuffer() throws IOException { + + int tailBytes = 0; + if (this.readBytes > 0) { + //startIndex = 0, readIndex = tailBytes length, writable = (buffer capacity - tailBytes) + this.buffer.markReaderIndex(); + this.buffer.discardReadBytes(); // compact the buffer + tailBytes = this.buffer.writerIndex(); + if (!this.buffer.isWritable()) { + // a line bytes is large than the buffer + BufferPool.ensureWritable(buffer, bufferSize * 2); + this.bufferSize = buffer.capacity(); + } + this.startIndex = 0; + } + + boolean release = true; + try { + int readBytes = tailBytes; + // read only once + int localReadBytes = buffer.writeBytes(channel, this.bufferSize - readBytes); + if (localReadBytes < 0) { + if (buffer.isWritable()) { + //if read bytes is less than the buffer capacity, there is no more bytes in the channel + eof = true; + } + } + readBytes += localReadBytes; + + this.readBytes += (readBytes - tailBytes); + release = false; + + this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail) + } finally { + if (release) { + buffer.release(); + } + } + } + + /** + * Read a line terminated by one of CR, LF, or CRLF. + */ + public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { + int readBytes = 0; // newline + text line bytes + int newlineLength = 0; //length of terminating newline + int readable; + + this.startIndex = buffer.readerIndex(); + + loop: + while (true) { + readable = buffer.readableBytes(); + if (readable <= 0) { + buffer.readerIndex(this.startIndex); + fillBuffer(); //compact and fill buffer + + //if buffer.writerIndex() is zero, there is no bytes in buffer + if (!buffer.isReadable() && buffer.writerIndex() == 0) { + reads.set(0); + return null; + } else { + //skip first newLine + if (processor.isPrevCharCR() && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) { + buffer.skipBytes(1); + if(eof && !buffer.isReadable()) { + reads.set(1); + return null; + } + + newlineLength++; + readBytes++; + startIndex = buffer.readerIndex(); + } + } + readable = buffer.readableBytes(); + } + + int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, processor); + if (endIndex < 0) { + //does not appeared terminating newline + buffer.readerIndex(buffer.writerIndex()); // set to end buffer + if(eof){ + readBytes += (buffer.readerIndex() - startIndex); + break loop; + } + } else { + buffer.readerIndex(endIndex + 1); + readBytes += (buffer.readerIndex() - startIndex); //past newline + text line + + //appeared terminating CRLF + if (processor.isPrevCharCR() && buffer.isReadable() + && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) { + buffer.skipBytes(1); + readBytes++; + newlineLength += 2; + } else { + newlineLength += 1; + } + break loop; + } + } + reads.set(readBytes); + return buffer.slice(startIndex, readBytes - newlineLength); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java new file mode 100644 index 0000000..1969d90 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineDeserializer.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufProcessor; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +public class CSVLineDeserializer extends TextLineDeserializer { + private ByteBufProcessor processor; + private FieldSerializerDeserializer fieldSerDer; + private ByteBuf nullChars; + private int delimiterCompensation; + + public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { + super(schema, meta, targetColumnIndexes); + } + + @Override + public void init() { + byte[] delimiter = CSVLineSerDe.getFieldDelimiter(meta); + this.processor = new FieldSplitProcessor(delimiter[0]); + this.delimiterCompensation = delimiter.length - 1; + + if (nullChars != null) { + nullChars.release(); + } + nullChars = TextLineSerDe.getNullChars(meta); + + fieldSerDer = new TextFieldSerializerDeserializer(meta); + } + + public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError { + int[] projection = targetColumnIndexes; + if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) { + return; + } + + final int rowLength = lineBuf.readableBytes(); + int start = 0, fieldLength = 0, end = 0; + + //Projection + int currentTarget = 0; + int currentIndex = 0; + + while (end != -1) { + end = lineBuf.forEachByte(start, rowLength - start, processor); + + if (end < 0) { + fieldLength = rowLength - start; + } else { + fieldLength = end - start - delimiterCompensation; + } + + if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { + lineBuf.setIndex(start, start + fieldLength); + Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars); + output.put(currentIndex, datum); + currentTarget++; + } + + if (projection.length == currentTarget) { + break; + } + + start = end + 1; + currentIndex++; + } + } + + @Override + public void release() { + if (nullChars != null) { + nullChars.release(); + nullChars = null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java new file mode 100644 index 0000000..566e5c9 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerDe.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.Bytes; + +public class CSVLineSerDe extends TextLineSerDe { + @Override + public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { + return new CSVLineDeserializer(schema, meta, targetColumnIndexes); + } + + @Override + public TextLineSerializer createSerializer(Schema schema, TableMeta meta) { + return new CSVLineSerializer(schema, meta); + } + + public static byte[] getFieldDelimiter(TableMeta meta) { + return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER, + StorageConstants.DEFAULT_FIELD_DELIMITER)).getBytes(Bytes.UTF8_CHARSET); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java new file mode 100644 index 0000000..e1c7375 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/CSVLineSerializer.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.AnyDatum; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; +import java.io.OutputStream; + +public class CSVLineSerializer extends TextLineSerializer { + private FieldSerializerDeserializer serde; + + private byte[] nullChars; + private byte[] delimiter; + private int columnNum; + + private final static String PARAM_DELIM = "|\t_"; + + public CSVLineSerializer(Schema schema, TableMeta meta) { + super(schema, meta); + } + + @Override + public void init() { + nullChars = TextLineSerDe.getNullCharsAsBytes(meta); + delimiter = "|,_".getBytes(); + columnNum = schema.size(); + + serde = new TextFieldSerializerDeserializer(meta); + } + + @Override + public int serialize(OutputStream out, Tuple input) throws IOException { + int writtenBytes = 0; + + for (int i = 0; i < columnNum; i++) { + Datum datum = input.get(i); + String typeStr; + if (datum.type() == TajoDataTypes.Type.ANY) { + typeStr = getTypeString(((AnyDatum)datum).getActual()); + } else { + typeStr = getTypeString(datum); + } + out.write(typeStr.getBytes()); + out.write(PARAM_DELIM.getBytes()); + + writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars); + + if (columnNum - 1 > i) { + out.write(delimiter); + writtenBytes += delimiter.length; + } + } + + return writtenBytes; + } + + @Override + public void release() { + + } + + private static String getTypeString(Datum val) { + switch (val.type()) { + case NULL_TYPE: + return "-"; + case BOOLEAN: + return "B"; + case INT1: + case INT2: + case INT4: + return "I"; + case INT8: + return "L"; + case FLOAT4: + return "F"; + case FLOAT8: + return "D"; + case NUMERIC: + return "E"; + case CHAR: + case TEXT: + return "C"; + case DATE: + case TIME: + case TIMESTAMP: + return "T"; + case BLOB: + case INET4: + case INET6: + return "A"; + default: + throw new UnsupportedException(val.type().name()); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java new file mode 100644 index 0000000..29a5aa2 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSerializerDeserializer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import io.netty.buffer.ByteBuf; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.Datum; + +import java.io.IOException; +import java.io.OutputStream; + + +public interface FieldSerializerDeserializer { + + int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException; + + Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) + throws IOException, TextLineParsingError; + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSplitProcessor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSplitProcessor.java new file mode 100644 index 0000000..2e89d39 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/FieldSplitProcessor.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import io.netty.buffer.ByteBufProcessor; + +public class FieldSplitProcessor implements ByteBufProcessor { + private byte delimiter; //the ascii separate character + + public FieldSplitProcessor(byte recordDelimiterByte) { + this.delimiter = recordDelimiterByte; + } + + @Override + public boolean process(byte value) throws Exception { + return delimiter != value; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java new file mode 100644 index 0000000..dcc53c1 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/InputHandler.java @@ -0,0 +1,78 @@ +/* + * Lisensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import org.apache.tajo.storage.Tuple; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; + +/** + * {@link InputHandler} is responsible for handling the input to the Tajo-Streaming external command. + * + */ +public class InputHandler implements Closeable { + + private final static byte[] END_OF_RECORD_DELIM = "|_\n".getBytes(); + private final static byte[] END_OF_STREAM = ("C" + "\\x04" + "|_\n").getBytes(); + + private final TextLineSerializer serializer; + + private OutputStream out; + + // flag to mark if close() has already been called + private boolean alreadyClosed = false; + + public InputHandler(TextLineSerializer serializer) { + this.serializer = serializer; + } + + /** + * Send the given input <code>Tuple</code> to the managed executable. + * + * @param t input <code>Tuple</code> + * @throws IOException + */ + public void putNext(Tuple t) throws IOException { + serializer.serialize(out, t); + out.write(END_OF_RECORD_DELIM); + } + + public void close() throws IOException { + if (!alreadyClosed) { + out.flush(); + out.close(); + out = null; + alreadyClosed = true; + } + } + + /** + * Bind the <code>InputHandler</code> to the <code>OutputStream</code> + * from which it reads input and sends it to the managed process. + * + * @param os <code>OutputStream</code> from which to read input data for the + * managed process + * @throws IOException + */ + public void bindTo(OutputStream os) throws IOException { + out = os; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/LineSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/LineSplitProcessor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/LineSplitProcessor.java new file mode 100644 index 0000000..0415141 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/LineSplitProcessor.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import io.netty.buffer.ByteBufProcessor; + +public class LineSplitProcessor implements ByteBufProcessor { + public static final byte CR = '\r'; + public static final byte LF = '\n'; + private boolean prevCharCR = false; //true of prev char was CR + + @Override + public boolean process(byte value) throws Exception { + switch (value) { + case LF: + return false; + case CR: + prevCharCR = true; + return false; + default: + prevCharCR = false; + return true; + } + } + + public boolean isPrevCharCR() { + return prevCharCR; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java new file mode 100644 index 0000000..c5eb419 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/OutputHandler.java @@ -0,0 +1,156 @@ +/* + * Lisensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import com.google.common.base.Charsets; +import io.netty.buffer.ByteBuf; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; + +/** + * {@link OutputHandler} is responsible for handling the output of the + * Tajo-Streaming external command. + */ +public class OutputHandler implements Closeable { + private static int DEFAULT_BUFFER = 64 * 1024; + private final static byte[] END_OF_RECORD_DELIM = "|_\n".getBytes(); + + private final TextLineDeserializer deserializer; + + private ByteBufLineReader in = null; + + private String currValue = null; + + private InputStream istream; + + private final ByteBuf buf = BufferPool.directBuffer(DEFAULT_BUFFER); + + // Both of these ignore the trailing "\n". So if the default delimiter is "\n", recordDelimStr is "". + private String recordDelimStr = null; + private int recordDelimLength = 0; + private final Tuple tuple = new VTuple(1); + + // flag to mark if close() has already been called + private boolean alreadyClosed = false; + + public OutputHandler(TextLineDeserializer deserializer) { + this.deserializer = deserializer; + } + + /** + * Bind the <code>OutputHandler</code> to the <code>InputStream</code> + * from which to read the output data of the managed process. + * + * @param is <code>InputStream</code> from which to read the output data + * of the managed process + * @throws IOException + */ + public void bindTo(InputStream is) throws IOException { + this.istream = is; + this.in = new ByteBufLineReader(new ByteBufInputChannel(istream)); + } + + /** + * Get the next output <code>Tuple</code> of the managed process. + * + * @return the next output <code>Tuple</code> of the managed process + * @throws IOException + */ + public Tuple getNext() throws IOException { + if (in == null) { + return null; + } + + currValue = null; + if (!readValue()) { + return null; + } + buf.clear(); + buf.writeBytes(currValue.getBytes()); + try { + deserializer.deserialize(buf, tuple); + } catch (TextLineParsingError textLineParsingError) { + throw new IOException(textLineParsingError); + } + return tuple; + } + + private boolean readValue() throws IOException { + currValue = in.readLine(); + if (currValue == null) { + return false; + } + + while(!isEndOfRow()) { + // Need to add back the newline character we ate. + currValue += '\n'; + + byte[] lineBytes = readNextLine(); + if (lineBytes == null) { + // We have no more input, so just break; + break; + } + currValue += new String(lineBytes); + } + + if (currValue.contains("|_")) { + int pos = currValue.lastIndexOf("|_"); + currValue = currValue.substring(0, pos); + } + + return true; + } + + private byte[] readNextLine() throws IOException { + String line = in.readLine(); + if (line == null) { + return null; + } + + return line.getBytes(); + } + + private boolean isEndOfRow() { + if (recordDelimStr == null) { + byte[] recordDelimBa = END_OF_RECORD_DELIM; + recordDelimLength = recordDelimBa.length - 1; //Ignore trailing \n + recordDelimStr = new String(recordDelimBa, 0, recordDelimLength, Charsets.UTF_8); + } + if (recordDelimLength == 0 || currValue.length() < recordDelimLength) { + return true; + } + return currValue.contains(recordDelimStr); + } + + /** + * Close the <code>OutputHandler</code>. + * @throws IOException + */ + public void close() throws IOException { + if(!alreadyClosed) { + istream.close(); + istream = null; + alreadyClosed = true; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java new file mode 100644 index 0000000..9bb60dd --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/StreamingUtil.java @@ -0,0 +1,91 @@ +/* + * Lisensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import org.apache.tajo.util.TUtil; + +import java.io.File; +import java.util.List; +import java.util.Map; + +public class StreamingUtil { + + private static final String BASH = "bash"; + private static final String PATH = "PATH"; + + /** + * Create an external process for StreamingCommand command. + * + * @param argv process arguments + * @return + */ + public static ProcessBuilder createProcess(String[] argv) { + // Set the actual command to run with 'bash -c exec ...' + List<String> cmdArgs = TUtil.newList(); + + StringBuffer argBuffer = new StringBuffer(); + for (String arg : argv) { + argBuffer.append(arg); + argBuffer.append(" "); + } + String argvAsString = argBuffer.toString(); + + if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) { + cmdArgs.add("cmd"); + cmdArgs.add("/c"); + cmdArgs.add(argvAsString); + } else { + cmdArgs.add(BASH); + cmdArgs.add("-c"); + StringBuffer sb = new StringBuffer(); + sb.append("exec "); + sb.append(argvAsString); + cmdArgs.add(sb.toString()); + } + + // Start the external process + ProcessBuilder processBuilder = new ProcessBuilder(cmdArgs + .toArray(new String[cmdArgs.size()])); + setupEnvironment(processBuilder); + return processBuilder; + } + + /** + * Set up the run-time environment of the managed process. + * + * @param pb {@link ProcessBuilder} used to exec the process + */ + private static void setupEnvironment(ProcessBuilder pb) { + String separator = ":"; + Map<String, String> env = pb.environment(); + + // Add the current-working-directory to the $PATH + File dir = pb.directory(); + String cwd = (dir != null) ? dir.getAbsolutePath() : System + .getProperty("user.dir"); + + String envPath = env.get(PATH); + if (envPath == null) { + envPath = cwd; + } else { + envPath = envPath + separator + cwd; + } + env.put(PATH, envPath); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java new file mode 100644 index 0000000..cd9518b --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextFieldSerializerDeserializer.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import com.google.protobuf.Message; +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; +import org.apache.commons.codec.binary.Base64; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.*; +import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.NumberUtil; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.CharsetDecoder; +import java.util.TimeZone; + +public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer { + public static final byte[] trueBytes = "true".getBytes(); + public static final byte[] falseBytes = "false".getBytes(); + private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8); + + private final boolean hasTimezone; + private final TimeZone timezone; + + public TextFieldSerializerDeserializer(TableMeta meta) { + hasTimezone = meta.containsOption(StorageConstants.TIMEZONE); + timezone = TimeZone.getTimeZone(meta.getOption(StorageConstants.TIMEZONE, TajoConstants.DEFAULT_SYSTEM_TIMEZONE)); + } + + private static boolean isNull(ByteBuf val, ByteBuf nullBytes) { + return !val.isReadable() || nullBytes.equals(val); + } + + private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) { + return val.readableBytes() > 0 && nullBytes.equals(val); + } + + @Override + public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) + throws IOException { + byte[] bytes; + int length = 0; + TajoDataTypes.DataType dataType = col.getDataType(); + + if (datum == null || datum instanceof NullDatum) { + switch (dataType.getType()) { + case CHAR: + case TEXT: + length = nullChars.length; + out.write(nullChars); + break; + default: + break; + } + return length; + } + + switch (dataType.getType()) { + case BOOLEAN: + out.write(datum.asBool() ? trueBytes : falseBytes); + length = trueBytes.length; + break; + case CHAR: + byte[] pad = new byte[dataType.getLength() - datum.size()]; + bytes = datum.asTextBytes(); + out.write(bytes); + out.write(pad); + length = bytes.length + pad.length; + break; + case TEXT: + case BIT: + case INT2: + case INT4: + case INT8: + case FLOAT4: + case FLOAT8: + case INET4: + case DATE: + case INTERVAL: + bytes = datum.asTextBytes(); + length = bytes.length; + out.write(bytes); + break; + case TIME: + if (hasTimezone) { + bytes = ((TimeDatum) datum).asChars(timezone, true).getBytes(); + } else { + bytes = datum.asTextBytes(); + } + length = bytes.length; + out.write(bytes); + break; + case TIMESTAMP: + if (hasTimezone) { + bytes = ((TimestampDatum) datum).asChars(timezone, true).getBytes(); + } else { + bytes = datum.asTextBytes(); + } + length = bytes.length; + out.write(bytes); + break; + case INET6: + case BLOB: + bytes = Base64.encodeBase64(datum.asByteArray(), false); + length = bytes.length; + out.write(bytes, 0, length); + break; + case PROTOBUF: + ProtobufDatum protobuf = (ProtobufDatum) datum; + byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes(); + length = protoBytes.length; + out.write(protoBytes, 0, protoBytes.length); + break; + case NULL_TYPE: + break; + case ANY: + AnyDatum anyDatum = (AnyDatum) datum; + length = serialize(out, anyDatum.getActual(), new Column("any", anyDatum.getActual().type()), 0, nullChars); + break; + default: + throw new UnsupportedException(dataType.getType().name()); + } + return length; + } + + @Override + public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException { + Datum datum; + TajoDataTypes.Type type = col.getDataType().getType(); + boolean nullField; + if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) { + nullField = isNullText(buf, nullChars); + } else { + nullField = isNull(buf, nullChars); + } + + if (nullField) { + datum = NullDatum.get(); + } else { + switch (type) { + case BOOLEAN: + byte bool = buf.readByte(); + datum = DatumFactory.createBool(bool == 't' || bool == 'T'); + break; + case BIT: + datum = DatumFactory.createBit(Byte.parseByte( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString())); + break; + case CHAR: + datum = DatumFactory.createChar( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim()); + break; + case INT1: + case INT2: + datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf)); + break; + case INT4: + datum = DatumFactory.createInt4(NumberUtil.parseInt(buf)); + break; + case INT8: + datum = DatumFactory.createInt8(NumberUtil.parseLong(buf)); + break; + case FLOAT4: + datum = DatumFactory.createFloat4( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case FLOAT8: + datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf)); + break; + case TEXT: { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + datum = DatumFactory.createText(bytes); + break; + } + case DATE: + datum = DatumFactory.createDate( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case TIME: + if (hasTimezone) { + datum = DatumFactory.createTime( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString(), timezone); + } else { + datum = DatumFactory.createTime( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + } + break; + case TIMESTAMP: + if (hasTimezone) { + datum = DatumFactory.createTimestamp( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString(), timezone); + } else { + datum = DatumFactory.createTimestamp( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + } + break; + case INTERVAL: + datum = DatumFactory.createInterval( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case PROTOBUF: { + ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); + Message.Builder builder = factory.newBuilder(); + try { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + protobufJsonFormat.merge(bytes, builder); + datum = factory.createDatum(builder.build()); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + break; + } + case INET4: + datum = DatumFactory.createInet4( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case BLOB: { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + datum = DatumFactory.createBlob(Base64.decodeBase64(bytes)); + break; + } + default: + datum = NullDatum.get(); + break; + } + } + return datum; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java new file mode 100644 index 0000000..c2ea30c --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineDeserializer.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import io.netty.buffer.ByteBuf; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +/** + * Reads a text line and fills a Tuple with values + */ +public abstract class TextLineDeserializer { + protected final Schema schema; + protected final TableMeta meta; + protected final int[] targetColumnIndexes; + + public TextLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { + this.schema = schema; + this.meta = meta; + this.targetColumnIndexes = targetColumnIndexes; + } + + /** + * Initialize SerDe + */ + public abstract void init(); + + /** + * It fills a tuple with a read fields in a given line. + * + * @param buf Read line + * @param output Tuple to be filled with read fields + * @throws IOException + */ + public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException, TextLineParsingError; + + /** + * Release external resources + */ + public abstract void release(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineParsingError.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineParsingError.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineParsingError.java new file mode 100644 index 0000000..9048e27 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineParsingError.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +public class TextLineParsingError extends Exception { + + public TextLineParsingError(Throwable t) { + super(t); + } + + public TextLineParsingError(String message, Throwable t) { + super(t.getMessage() + ", Error line: " + message); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java new file mode 100644 index 0000000..89b169c --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerDe.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.Bytes; + +/** + * Pluggable Text Line SerDe class + */ +public abstract class TextLineSerDe { + + public TextLineSerDe() { + } + + public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes); + + public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta); + + public static ByteBuf getNullChars(TableMeta meta) { + byte[] nullCharByteArray = getNullCharsAsBytes(meta); + + ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length); + nullChars.writeBytes(nullCharByteArray); + + return nullChars; + } + + public static byte [] getNullCharsAsBytes(TableMeta meta) { + byte [] nullChars; + + String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL, + NullDatum.DEFAULT_TEXT)); + if (StringUtils.isEmpty(nullCharacters)) { + nullChars = NullDatum.get().asTextBytes(); + } else { + nullChars = nullCharacters.getBytes(Bytes.UTF8_CHARSET); + } + + return nullChars; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java new file mode 100644 index 0000000..ea8576c --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/TextLineSerializer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.function.stream; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Write a Tuple into single text formatted line + */ +public abstract class TextLineSerializer { + protected Schema schema; + protected TableMeta meta; + + public TextLineSerializer(Schema schema, TableMeta meta) { + this.schema = schema; + this.meta = meta; + } + + public abstract void init(); + + public abstract int serialize(OutputStream out, Tuple input) throws IOException; + + public abstract void release(); +}
