TAJO-1344: Python UDF support. (jihoon) Closes #526
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a7453853 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a7453853 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a7453853 Branch: refs/heads/index_support Commit: a7453853975dc8b5f4162ae16a1669aeb7ed995c Parents: 14a1e53 Author: Jihoon Son <[email protected]> Authored: Sat Apr 18 11:56:47 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Sat Apr 18 11:57:24 2015 +0900 ---------------------------------------------------------------------- CHANGES | 8 +- .../org/apache/tajo/catalog/CatalogUtil.java | 3 +- .../org/apache/tajo/catalog/FunctionDesc.java | 4 +- .../tajo/function/FunctionInvocation.java | 24 +- .../org/apache/tajo/function/FunctionUtil.java | 10 + .../tajo/function/PythonInvocationDesc.java | 98 +++++ .../src/main/proto/CatalogProtos.proto | 6 + .../apache/tajo/catalog/TestFunctionDesc.java | 4 +- .../tajo/catalog/store/AbstractDBStore.java | 5 +- .../org/apache/tajo/catalog/store/MemStore.java | 1 - .../org/apache/tajo/catalog/TestCatalog.java | 4 +- .../main/java/org/apache/tajo/QueryVars.java | 2 +- .../java/org/apache/tajo/conf/TajoConf.java | 4 + .../java/org/apache/tajo/datum/AnyDatum.java | 82 +++++ .../java/org/apache/tajo/datum/BlobDatum.java | 5 +- .../org/apache/tajo/datum/DatumFactory.java | 21 +- .../java/org/apache/tajo/json/DatumAdapter.java | 6 +- .../java/org/apache/tajo/storage/Tuple.java | 54 +-- .../java/org/apache/tajo/util/FileUtil.java | 23 ++ .../java/org/apache/tajo/util/KeyValueSet.java | 2 +- .../tajo/util/datetime/DateTimeFormat.java | 2 - .../tajo/engine/codegen/EvalCodeGenContext.java | 6 +- .../codegen/LegacyFunctionBindingEmitter.java | 4 +- .../engine/codegen/VariablesPreBuilder.java | 2 +- .../tajo/engine/function/FunctionLoader.java | 59 ++- .../apache/tajo/engine/planner/Projector.java | 2 +- .../rules/GlobalPlanEqualityTester.java | 2 +- .../planner/physical/AggregationExec.java | 2 +- .../planner/physical/BSTIndexScanExec.java | 2 +- .../engine/planner/physical/CommonJoinExec.java | 2 +- .../DistinctGroupbyFirstAggregationExec.java | 2 +- .../DistinctGroupbyHashAggregationExec.java | 2 +- .../DistinctGroupbySecondAggregationExec.java | 2 +- .../DistinctGroupbyThirdAggregationExec.java | 2 +- .../engine/planner/physical/EvalExprExec.java | 2 +- .../planner/physical/HashLeftOuterJoinExec.java | 4 +- .../engine/planner/physical/HavingExec.java | 2 +- .../engine/planner/physical/SelectionExec.java | 2 +- .../engine/planner/physical/SeqScanExec.java | 4 +- .../engine/planner/physical/WindowAggExec.java | 2 +- .../apache/tajo/engine/query/QueryContext.java | 2 +- .../org/apache/tajo/master/GlobalEngine.java | 2 - .../java/org/apache/tajo/master/TajoMaster.java | 16 +- .../apache/tajo/master/exec/QueryExecutor.java | 81 ++-- .../java/org/apache/tajo/worker/TajoWorker.java | 11 +- .../main/java/org/apache/tajo/worker/Task.java | 35 +- .../apache/tajo/worker/TaskAttemptContext.java | 7 + tajo-core/src/main/resources/python/__init__.py | 17 + .../src/main/resources/python/controller.py | 330 +++++++++++++++++ .../src/main/resources/python/tajo_util.py | 103 ++++++ .../org/apache/tajo/TajoTestingCluster.java | 3 + .../apache/tajo/engine/eval/ExprTestBase.java | 19 +- .../apache/tajo/engine/eval/TestEvalTree.java | 68 ++-- .../tajo/engine/eval/TestEvalTreeUtil.java | 12 +- .../tajo/engine/function/TestMathFunctions.java | 10 +- .../engine/function/TestPythonFunctions.java | 44 +++ .../tajo/engine/query/TestGroupByQuery.java | 14 + .../tajo/engine/query/TestSelectQuery.java | 21 ++ tajo-core/src/test/resources/python/__init__.py | 17 + .../src/test/resources/python/test_funcs.py | 33 ++ .../src/test/resources/python/test_funcs.pyc | Bin 0 -> 1042 bytes .../src/test/resources/python/test_funcs2.py | 32 ++ .../testGroupbyWithPythonFunc.sql | 1 + .../testGroupbyWithPythonFunc2.sql | 1 + .../testNestedPythonFunction.sql | 1 + .../TestSelectQuery/testSelectPythonFuncs.sql | 2 + .../testSelectWithPredicateOnPythonFunc.sql | 1 + .../testGroupbyWithPythonFunc.result | 7 + .../testGroupbyWithPythonFunc2.result | 7 + .../testNestedPythonFunction.result | 7 + .../testSelectPythonFuncs.result | 7 + .../testSelectWithPredicateOnPythonFunc.result | 17 + tajo-docs/src/main/sphinx/functions.rst | 70 +++- .../src/main/sphinx/functions/json_func.rst | 1 + .../org/apache/tajo/plan/ExprAnnotator.java | 26 +- .../org/apache/tajo/plan/LogicalPlanner.java | 9 +- .../apache/tajo/plan/expr/AlgebraicUtil.java | 6 +- .../tajo/plan/expr/BetweenPredicateEval.java | 28 +- .../org/apache/tajo/plan/expr/CastEval.java | 21 +- .../org/apache/tajo/plan/expr/EvalContext.java | 45 +++ .../org/apache/tajo/plan/expr/EvalNode.java | 5 +- .../org/apache/tajo/plan/expr/EvalTreeUtil.java | 4 +- .../org/apache/tajo/plan/expr/FieldEval.java | 4 +- .../org/apache/tajo/plan/expr/FunctionEval.java | 4 +- .../tajo/plan/expr/GeneralFunctionEval.java | 56 +-- .../plan/expr/PatternMatchPredicateEval.java | 4 +- .../exprrewrite/EvalTreeOptimizationRule.java | 3 +- .../plan/exprrewrite/EvalTreeOptimizer.java | 1 + .../plan/exprrewrite/rules/ConstantFolding.java | 33 +- .../tajo/plan/function/FunctionInvoke.java | 90 +++++ .../plan/function/FunctionInvokeContext.java | 74 ++++ .../function/LegacyScalarFunctionInvoke.java | 81 ++++ .../plan/function/PythonFunctionInvoke.java | 59 +++ .../function/python/PythonScriptEngine.java | 368 +++++++++++++++++++ .../plan/function/python/TajoScriptEngine.java | 83 +++++ .../tajo/plan/function/stream/BufferPool.java | 74 ++++ .../function/stream/ByteBufInputChannel.java | 71 ++++ .../plan/function/stream/ByteBufLineReader.java | 176 +++++++++ .../function/stream/CSVLineDeserializer.java | 99 +++++ .../tajo/plan/function/stream/CSVLineSerDe.java | 42 +++ .../plan/function/stream/CSVLineSerializer.java | 118 ++++++ .../stream/FieldSerializerDeserializer.java | 36 ++ .../function/stream/FieldSplitProcessor.java | 34 ++ .../tajo/plan/function/stream/InputHandler.java | 78 ++++ .../function/stream/LineSplitProcessor.java | 45 +++ .../plan/function/stream/OutputHandler.java | 156 ++++++++ .../plan/function/stream/StreamingUtil.java | 91 +++++ .../stream/TextFieldSerializerDeserializer.java | 257 +++++++++++++ .../function/stream/TextLineDeserializer.java | 60 +++ .../function/stream/TextLineParsingError.java | 31 ++ .../plan/function/stream/TextLineSerDe.java | 65 ++++ .../function/stream/TextLineSerializer.java | 45 +++ .../rules/LogicalPlanEqualityTester.java | 2 +- .../rewrite/rules/PartitionedTableRewriter.java | 2 +- .../tajo/plan/serder/EvalNodeDeserializer.java | 16 +- .../tajo/plan/serder/EvalNodeSerializer.java | 10 +- .../plan/serder/LogicalNodeDeserializer.java | 137 +++---- tajo-plan/src/main/proto/Plan.proto | 28 +- .../org/apache/tajo/storage/TestLazyTuple.java | 2 +- .../testErrorTolerance1.json | 12 +- .../dataset/TestJsonSerDe/testVariousType.json | 2 +- 121 files changed, 3816 insertions(+), 349 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 4899ed2..2084737 100644 --- a/CHANGES +++ b/CHANGES @@ -4,14 +4,16 @@ Release 0.11.0 - unreleased NEW FEATURES + TAJO-1344: Python UDF support. (jihoon) + + TAJO-923: Add VAR_SAMP and VAR_POP window functions. + (Contributed by Dongjoon Hyun, Committed by jihoon) + TAJO-1494: Add SeekableScanner support to DelimitedTextFileScanner. (jinho) TAJO-921: Add STDDEV_SAMP and STDDEV_POP window functions. (Keuntae Park) - TAJO-923: Add VAR_SAMP and VAR_POP window functions. - (Contributed by Dongjoon Hyun, Committed by jihoon) - TAJO-1135: Implement queryable virtual table for cluster information. (jihun) http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index 628d710..dcfad8d 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -24,7 +24,6 @@ import org.apache.tajo.DataTypeUtil; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto; import org.apache.tajo.common.TajoDataTypes; @@ -418,7 +417,7 @@ public class CatalogUtil { // (a) () // (a,b) (a) - int definedSize = definedTypes == null ? 0 : definedTypes.size(); + int definedSize = definedTypes.size(); int givenParamSize = givenTypes == null ? 0 : givenTypes.size(); int paramDiff = givenParamSize - definedSize; if (paramDiff < 0) { http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java index 23d39f2..6ea6ac6 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java @@ -92,7 +92,7 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable, */ public Function newInstance() throws InternalException { try { - Constructor<? extends Function> cons = getFuncClass().getConstructor(); + Constructor<? extends Function> cons = getLegacyFuncClass().getConstructor(); return cons.newInstance(); } catch (Exception ioe) { throw new InternalException("Cannot initiate function " + signature); @@ -124,7 +124,7 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable, //////////////////////////////////////// @SuppressWarnings("unchecked") - public Class<? extends Function> getFuncClass() { + public Class<? extends Function> getLegacyFuncClass() { return invocation.getLegacy().getFunctionClass(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java index 653bdb6..911d5dd 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java @@ -35,6 +35,8 @@ public class FunctionInvocation implements ProtoObject<FunctionInvocationProto> StaticMethodInvocationDesc scalarJIT; @Expose ClassBaseInvocationDesc<?> aggregationJIT; + @Expose + PythonInvocationDesc python; public FunctionInvocation() { } @@ -55,6 +57,9 @@ public class FunctionInvocation implements ProtoObject<FunctionInvocationProto> if (proto.hasAggregationJIT()) { this.aggregationJIT = new ClassBaseInvocationDesc(proto.getAggregation()); } + if (proto.hasPython()) { + this.python = new PythonInvocationDesc(proto.getPython()); + } } public boolean isAvailable() { @@ -121,6 +126,18 @@ public class FunctionInvocation implements ProtoObject<FunctionInvocationProto> return aggregationJIT; } + public boolean hasPython() { + return python != null; + } + + public void setPython(PythonInvocationDesc python) { + this.python = python; + } + + public PythonInvocationDesc getPython() { + return python; + } + @Override public FunctionInvocationProto getProto() { FunctionInvocationProto.Builder builder = FunctionInvocationProto.newBuilder(); @@ -139,16 +156,19 @@ public class FunctionInvocation implements ProtoObject<FunctionInvocationProto> if (hasAggregationJIT()) { builder.setAggregationJIT(aggregationJIT.getProto()); } + if (hasPython()) { + builder.setPython(python.getProto()); + } return builder.build(); } @Override public int hashCode() { - return Objects.hashCode(legacy, scalar, scalarJIT); + return Objects.hashCode(legacy, scalar, scalarJIT, python); } public String toString() { return "legacy=" + hasLegacy() + ",scalar=" + hasScalar() + ",agg=" + hasAggregation() + - ",scalarJIT=" + hasScalarJIT() + ",aggJIT=" + hasAggregationJIT(); + ",scalarJIT=" + hasScalarJIT() + ",aggJIT=" + hasAggregationJIT() + ",python=" + hasPython(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionUtil.java index dee5d1c..ef70428 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionUtil.java @@ -18,6 +18,8 @@ package org.apache.tajo.function; +import org.apache.tajo.catalog.FunctionDesc; + import java.util.Collection; import static org.apache.tajo.common.TajoDataTypes.DataType; @@ -53,4 +55,12 @@ public class FunctionUtil { public static boolean isNullableParam(Class<?> clazz) { return !clazz.isPrimitive(); } + + public static boolean isLegacyFunction(FunctionDesc desc) { + return desc.getInvocation().hasLegacy(); + } + + public static boolean isScriptFunction(FunctionDesc desc) { + return desc.getInvocation().hasPython(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/PythonInvocationDesc.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/PythonInvocationDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/PythonInvocationDesc.java new file mode 100644 index 0000000..160b169 --- /dev/null +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/PythonInvocationDesc.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.function; + +import com.google.common.base.Objects; +import com.google.gson.annotations.Expose; +import org.apache.tajo.catalog.proto.CatalogProtos.PythonInvocationDescProto; +import org.apache.tajo.common.ProtoObject; +import org.apache.tajo.util.TUtil; + +/** + * <code>PythonInvocationDesc</code> describes a function name + * and a file path to the script where the function is defined. + */ +public class PythonInvocationDesc implements ProtoObject<PythonInvocationDescProto>, Cloneable { + @Expose private String funcName; + @Expose private String filePath; + + public PythonInvocationDesc() { + + } + + public PythonInvocationDesc(String funcName, String filePath) { + this.funcName = funcName; + this.filePath = filePath; + } + + public void setFuncName(String funcName) { + this.funcName = funcName; + } + + public void setFilePath(String filePath) { + this.filePath = filePath; + } + + public PythonInvocationDesc(PythonInvocationDescProto proto) { + this(proto.getFuncName(), proto.getFilePath()); + } + + public String getName() { + return funcName; + } + + public String getPath() { + return filePath; + } + + @Override + public PythonInvocationDescProto getProto() { + PythonInvocationDescProto.Builder builder = PythonInvocationDescProto.newBuilder(); + builder.setFuncName(funcName).setFilePath(filePath); + return builder.build(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof PythonInvocationDesc) { + PythonInvocationDesc other = (PythonInvocationDesc) o; + return TUtil.checkEquals(funcName, other.funcName) && + TUtil.checkEquals(filePath, other.filePath); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(funcName, filePath); + } + + @Override + public String toString() { + return funcName + " at " + filePath; + } + + @Override + public Object clone() throws CloneNotSupportedException { + PythonInvocationDesc clone = (PythonInvocationDesc) super.clone(); + clone.funcName = funcName == null ? null : funcName; + clone.filePath = filePath == null ? null : filePath; + return clone; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index 1402bbc..fd2cb19 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -411,6 +411,7 @@ message FunctionInvocationProto { optional ClassBaseInvocationDescProto aggregation = 3; optional StaticMethodInvocationDescProto scalarJIT = 4; optional ClassBaseInvocationDescProto aggregationJIT = 5; + optional PythonInvocationDescProto python = 6; } message ClassBaseInvocationDescProto { @@ -423,3 +424,8 @@ message StaticMethodInvocationDescProto { required string returnClass = 3; repeated string paramClasses = 4; } + +message PythonInvocationDescProto { + required string funcName = 1; + required string filePath = 2; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java index 92d2aa4..4a67ce6 100644 --- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java @@ -68,7 +68,7 @@ public class TestFunctionDesc { desc.setDetail("detail"); assertEquals("sum", desc.getFunctionName()); - assertEquals(TestSum.class, desc.getFuncClass()); + assertEquals(TestSum.class, desc.getLegacyFuncClass()); assertEquals(FunctionType.GENERAL, desc.getFuncType()); assertEquals(Type.INT4, desc.getReturnType().getType()); assertArrayEquals(CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.INT8), @@ -84,7 +84,7 @@ public class TestFunctionDesc { FunctionDesc newDesc = new FunctionDesc(proto); assertEquals("sum", newDesc.getFunctionName()); - assertEquals(TestSum.class, newDesc.getFuncClass()); + assertEquals(TestSum.class, newDesc.getLegacyFuncClass()); assertEquals(FunctionType.GENERAL, newDesc.getFuncType()); assertEquals(Type.INT4, newDesc.getReturnType().getType()); http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java index 1773033..b10e5a5 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java @@ -62,8 +62,6 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo private Connection conn; - protected Map<String, Boolean> baseTableMaps = new HashMap<String, Boolean>(); - protected XMLCatalogSchemaManager catalogSchemaManager; protected abstract String getCatalogDriverName(); @@ -1266,11 +1264,11 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo conn = getConnection(); pstmt = conn.prepareStatement(ADD_PARTITION_SQL); - pstmt.setInt(1, tableId); pstmt.setString(2, partition.getPartitionName()); pstmt.setString(3, partition.getPath()); pstmt.executeUpdate(); + pstmt.close(); if (partition.getPartitionKeysCount() > 0) { pstmt = conn.prepareStatement(ADD_PARTITION_KEYS_SQL); @@ -1349,6 +1347,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo pstmt = conn.prepareStatement(sqlDeletePartitionKeys); pstmt.setInt(1, partitionId); pstmt.executeUpdate(); + pstmt.close(); pstmt = conn.prepareStatement(sqlDeletePartition); pstmt.setInt(1, partitionId); http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java index b058504..821b00c 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java @@ -30,7 +30,6 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.exception.*; -import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java index 0a2a8cc..306f581 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java @@ -666,7 +666,7 @@ public class TestCatalog { FunctionDesc retrived = catalog.getFunction("test10", CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.BLOB)); assertEquals(retrived.getFunctionName(), "test10"); - assertEquals(retrived.getFuncClass(), TestFunc2.class); + assertEquals(retrived.getLegacyFuncClass(), TestFunc2.class); assertEquals(retrived.getFuncType(), FunctionType.GENERAL); assertFalse(catalog.containFunction("test10", CatalogUtil.newSimpleDataTypeArray(Type.BLOB, Type.INT4))); @@ -685,7 +685,7 @@ public class TestCatalog { FunctionDesc retrived = catalog.getFunction("test2", CatalogUtil.newSimpleDataTypeArray(Type.INT4)); assertEquals(retrived.getFunctionName(),"test2"); - assertEquals(retrived.getFuncClass(),TestFunc1.class); + assertEquals(retrived.getLegacyFuncClass(),TestFunc1.class); assertEquals(retrived.getFuncType(),FunctionType.UDF); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/QueryVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryVars.java b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java index ba76d63..55ca700 100644 --- a/tajo-common/src/main/java/org/apache/tajo/QueryVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 8e2b3d2..bfba290 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -293,6 +293,10 @@ public class TajoConf extends Configuration { // Geo IP GEOIP_DATA("tajo.function.geoip-database-location", ""), + // Python UDF + PYTHON_CODE_DIR("tajo.function.python.code-dir", ""), + PYTHON_CONTROLLER_LOG_DIR("tajo.function.python.controller.log-dir", ""), + ///////////////////////////////////////////////////////////////////////////////// // User Session Configuration // http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/datum/AnyDatum.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/AnyDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/AnyDatum.java new file mode 100644 index 0000000..0771a6e --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/datum/AnyDatum.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.datum; + +import com.google.gson.annotations.Expose; + +import static org.apache.tajo.common.TajoDataTypes.Type.ANY; + +/** + * <code>AnyDatum</code> can contain any types of datum. + */ +public class AnyDatum extends Datum { + @Expose Datum val; + + public AnyDatum(Datum val) { + super(ANY); + this.val = val; + } + + public Datum getActual() { + return val; + } + + @Override + public int size() { + return this.val.size(); + } + + @Override + public int hashCode() { + return val.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof AnyDatum) { + AnyDatum other = (AnyDatum) obj; + return val.equals(other.val); + } + return false; + } + + @Override + public Datum equalsTo(Datum datum) { + if (datum.type() == ANY) { + AnyDatum other = (AnyDatum) datum; + return val.equalsTo(other.val); + } + return DatumFactory.createBool(false); + } + + @Override + public int compareTo(Datum datum) { + if (datum.type() == ANY) { + AnyDatum other = (AnyDatum) datum; + return val.compareTo(other.val); + } + // Any datums will be lastly appeared. + return 1; + } + + @Override + public String toString() { + return val.toString(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java index 4f296a1..cf190e2 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java @@ -23,6 +23,7 @@ package org.apache.tajo.datum; import com.google.gson.annotations.Expose; import org.apache.tajo.exception.InvalidOperationException; +import org.apache.tajo.util.TUtil; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -130,8 +131,8 @@ public class BlobDatum extends Datum { BlobDatum other = (BlobDatum) obj; initFromBytes(); other.initFromBytes(); - return bb.equals(other.bb); - } + return Arrays.equals(this.val, other.val); + } return false; } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java index 9f48cad..bd1b88f 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java @@ -19,6 +19,7 @@ package org.apache.tajo.datum; import com.google.protobuf.Message; +import org.apache.commons.codec.binary.Base64; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.exception.InvalidCastException; @@ -66,7 +67,7 @@ public class DatumFactory { case INET4: return Inet4Datum.class; case ANY: - return NullDatum.class; + return AnyDatum.class; case NULL_TYPE: return NullDatum.class; default: @@ -377,16 +378,16 @@ public class DatumFactory { return new TimestampDatum(DateTimeUtil.toJulianTimestampWithTZ(str, tz)); } - public static BlobDatum createBlob(byte[] val) { - return new BlobDatum(val); + public static BlobDatum createBlob(byte[] encoded) { + return new BlobDatum(encoded); } - public static BlobDatum createBlob(byte[] val, int offset, int length) { - return new BlobDatum(val, offset, length); + public static BlobDatum createBlob(byte[] encoded, int offset, int length) { + return new BlobDatum(encoded, offset, length); } - public static BlobDatum createBlob(String val) { - return new BlobDatum(val.getBytes()); + public static BlobDatum createBlob(String plainString) { + return new BlobDatum(Base64.encodeBase64(plainString.getBytes())); } public static Inet4Datum createInet4(int encoded) { @@ -405,6 +406,10 @@ public class DatumFactory { return new Inet4Datum(val); } + public static AnyDatum createAny(Datum val) { + return new AnyDatum(val); + } + public static Datum cast(Datum operandDatum, DataType target, @Nullable TimeZone tz) { switch (target.getType()) { case BOOLEAN: @@ -454,6 +459,8 @@ public class DatumFactory { return DatumFactory.createBlob(operandDatum.asByteArray()); case INET4: return DatumFactory.createInet4(operandDatum.asByteArray()); + case ANY: + return DatumFactory.createAny(operandDatum); default: throw new InvalidCastException(operandDatum.type(), target.getType()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/json/DatumAdapter.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/json/DatumAdapter.java b/tajo-common/src/main/java/org/apache/tajo/json/DatumAdapter.java index d65559d..b9d8aef 100644 --- a/tajo-common/src/main/java/org/apache/tajo/json/DatumAdapter.java +++ b/tajo-common/src/main/java/org/apache/tajo/json/DatumAdapter.java @@ -41,8 +41,9 @@ public class DatumAdapter implements GsonSerDerAdapter<Datum> { return new TimestampDatum(CommonGsonHelper.getOrDie(jsonObject, "value").getAsLong()); case INTERVAL: String[] values = CommonGsonHelper.getOrDie(jsonObject, "value").getAsString().split(","); - return new IntervalDatum(Integer.parseInt(values[0]), Long.parseLong(values[1])); + case ANY: + return new AnyDatum(deserialize(CommonGsonHelper.getOrDie(jsonObject, "actual"), typeOfT, context)); default: return context.deserialize(CommonGsonHelper.getOrDie(jsonObject, "body"), DatumFactory.getDatumClass(TajoDataTypes.Type.valueOf(typeName))); @@ -67,6 +68,9 @@ public class DatumAdapter implements GsonSerDerAdapter<Datum> { IntervalDatum interval = (IntervalDatum)src; jsonObj.addProperty("value", interval.getMonths() + "," + interval.getMilliSeconds()); break; + case ANY: + jsonObj.add("actual", serialize(((AnyDatum) src).getActual(), typeOfSrc, context)); + break; default: jsonObj.add("body", context.serialize(src)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java index 1ba1926..aec784f 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java @@ -22,58 +22,58 @@ import org.apache.tajo.datum.Datum; public interface Tuple extends Cloneable { - public int size(); + int size(); - public boolean contains(int fieldid); + boolean contains(int fieldid); - public boolean isNull(int fieldid); + boolean isNull(int fieldid); @SuppressWarnings("unused") - public boolean isNotNull(int fieldid); + boolean isNotNull(int fieldid); - public void clear(); + void clear(); - public void put(int fieldId, Datum value); + void put(int fieldId, Datum value); - public void put(int fieldId, Datum[] values); + void put(int fieldId, Datum[] values); - public void put(int fieldId, Tuple tuple); + void put(int fieldId, Tuple tuple); - public void put(Datum[] values); + void put(Datum[] values); - public Datum get(int fieldId); + Datum get(int fieldId); - public void setOffset(long offset); + void setOffset(long offset); - public long getOffset(); + long getOffset(); - public boolean getBool(int fieldId); + boolean getBool(int fieldId); - public byte getByte(int fieldId); + byte getByte(int fieldId); - public char getChar(int fieldId); + char getChar(int fieldId); - public byte [] getBytes(int fieldId); + byte [] getBytes(int fieldId); - public short getInt2(int fieldId); + short getInt2(int fieldId); - public int getInt4(int fieldId); + int getInt4(int fieldId); - public long getInt8(int fieldId); + long getInt8(int fieldId); - public float getFloat4(int fieldId); + float getFloat4(int fieldId); - public double getFloat8(int fieldId); + double getFloat8(int fieldId); - public String getText(int fieldId); + String getText(int fieldId); - public Datum getProtobufDatum(int fieldId); + Datum getProtobufDatum(int fieldId); - public Datum getInterval(int fieldId); + Datum getInterval(int fieldId); - public char [] getUnicodeChars(int fieldId); + char [] getUnicodeChars(int fieldId); - public Tuple clone() throws CloneNotSupportedException; + Tuple clone() throws CloneNotSupportedException; - public Datum[] getValues(); + Datum[] getValues(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java index 9403a2f..3e3d3a2 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java @@ -19,6 +19,7 @@ package org.apache.tajo.util; import com.google.protobuf.Message; +import org.apache.commons.logging.Log; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; @@ -143,4 +144,26 @@ public class FileUtil { public static boolean isLocalPath(Path path) { return path.toUri().getScheme().equals("file"); } + + + /** + * Close the Closeable objects and <b>ignore</b> any {@link IOException} or + * null pointers. Must only be used for cleanup in exception handlers. + * + * @param log the log to record problems to at debug level. Can be null. + * @param closeables the objects to close + */ + public static void cleanup(Log log, java.io.Closeable... closeables) { + for (java.io.Closeable c : closeables) { + if (c != null) { + try { + c.close(); + } catch(IOException e) { + if (log != null && log.isDebugEnabled()) { + log.debug("Exception in closing " + c, e); + } + } + } + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java index 6af0c9e..5dba9e2 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java @@ -104,7 +104,7 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs } else if (defaultVal != null) { return defaultVal; } else { - throw new IllegalArgumentException("No such a config key: " + key); + throw new IllegalArgumentException("No such config key: " + key); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeFormat.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeFormat.java b/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeFormat.java index c3aa71e..798b9c5 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeFormat.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeFormat.java @@ -477,7 +477,6 @@ public class DateTimeFormat { KeyWord keyword = new KeyWord(); keyword.name = (String)eachKeywordValue[0]; keyword.len = ((Integer)eachKeywordValue[1]).intValue(); - keyword.id = ((DCH_poz)eachKeywordValue[2]).getValue(); keyword.idType = ((DCH_poz)eachKeywordValue[2]); keyword.is_digit = ((Boolean)eachKeywordValue[3]).booleanValue(); keyword.date_mode = (FromCharDateMode)eachKeywordValue[4]; @@ -513,7 +512,6 @@ public class DateTimeFormat { static class KeyWord { String name; int len; - int id; DCH_poz idType; boolean is_digit; FromCharDateMode date_mode; http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java index c8197b7..32fb562 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java @@ -174,7 +174,7 @@ public class EvalCodeGenContext extends TajoGeneratorAdapter { } else if (entry.getKey().getType() == EvalType.FUNCTION) { GeneralFunctionEval function = (GeneralFunctionEval) entry.getKey(); - final String internalName = TajoGeneratorAdapter.getInternalName(function.getFuncDesc().getFuncClass()); + final String internalName = TajoGeneratorAdapter.getInternalName(function.getFuncDesc().getLegacyFuncClass()); // new and initialization of function initMethod.visitTypeInsn(Opcodes.NEW, internalName); @@ -198,12 +198,12 @@ public class EvalCodeGenContext extends TajoGeneratorAdapter { initMethod.visitVarInsn(Opcodes.ALOAD, FUNCTION); consAdapter.aload(PARAM_TYPE_ARRAY); - consAdapter.invokeVirtual(function.getFuncDesc().getFuncClass(), "init", void.class, new Class[] {FunctionEval.ParamType[].class}); + consAdapter.invokeVirtual(function.getFuncDesc().getLegacyFuncClass(), "init", void.class, new Class[] {FunctionEval.ParamType[].class}); initMethod.visitVarInsn(Opcodes.ALOAD, 0); initMethod.visitVarInsn(Opcodes.ALOAD, FUNCTION); initMethod.visitFieldInsn(Opcodes.PUTFIELD, this.owner, entry.getValue(), - "L" + TajoGeneratorAdapter.getInternalName(function.getFuncDesc().getFuncClass()) + ";"); + "L" + TajoGeneratorAdapter.getInternalName(function.getFuncDesc().getLegacyFuncClass()) + ";"); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/codegen/LegacyFunctionBindingEmitter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/LegacyFunctionBindingEmitter.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/LegacyFunctionBindingEmitter.java index 36dfe4f..6b47149 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/LegacyFunctionBindingEmitter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/LegacyFunctionBindingEmitter.java @@ -63,12 +63,12 @@ public class LegacyFunctionBindingEmitter { FunctionDesc desc = func.getFuncDesc(); String fieldName = context.symbols.get(func); - String funcDescName = "L" + TajoGeneratorAdapter.getInternalName(desc.getFuncClass()) + ";"; + String funcDescName = "L" + TajoGeneratorAdapter.getInternalName(desc.getLegacyFuncClass()) + ";"; context.aload(0); context.methodvisitor.visitFieldInsn(Opcodes.GETFIELD, context.owner, fieldName, funcDescName); context.aload(TUPLE); - context.invokeVirtual(desc.getFuncClass(), "eval", Datum.class, new Class[] {Tuple.class}); + context.invokeVirtual(desc.getLegacyFuncClass(), "eval", Datum.class, new Class[] {Tuple.class}); context.convertToPrimitive(func.getValueType()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/codegen/VariablesPreBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/VariablesPreBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/VariablesPreBuilder.java index a055b04..95ec371 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/VariablesPreBuilder.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/VariablesPreBuilder.java @@ -75,7 +75,7 @@ class VariablesPreBuilder extends SimpleEvalNodeVisitor<EvalCodeGenContext> { String fieldName = function.getFuncDesc().getFunctionName() + "_" + context.seqId++; context.symbols.put(function, fieldName); context.classWriter.visitField(Opcodes.ACC_PRIVATE, fieldName, - "L" + TajoGeneratorAdapter.getInternalName(function.getFuncDesc().getFuncClass()) + ";", null, null); + "L" + TajoGeneratorAdapter.getInternalName(function.getFuncDesc().getLegacyFuncClass()) + ";", null, null); } return function; http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java index 3b3e7c7..6061d1b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java @@ -25,16 +25,24 @@ import com.google.common.collect.Sets; import org.apache.commons.collections.Predicate; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.function.annotation.Description; import org.apache.tajo.engine.function.annotation.ParamOptionTypes; import org.apache.tajo.engine.function.annotation.ParamTypes; import org.apache.tajo.function.*; +import org.apache.tajo.plan.function.python.PythonScriptEngine; import org.apache.tajo.util.ClassUtil; +import org.apache.tajo.util.TUtil; +import java.io.IOException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.*; @@ -44,8 +52,14 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.GENERAL; public class FunctionLoader { private static Log LOG = LogFactory.getLog(FunctionLoader.class); + public static final String PYTHON_FUNCTION_NAMESPACE = "python"; - public static Collection<FunctionDesc> load() { + /** + * Load built-in functions + * + * @return + */ + public static Map<FunctionSignature, FunctionDesc> load() { Map<FunctionSignature, FunctionDesc> map = Maps.newHashMap(); List<FunctionDesc> dd = Lists.newArrayList(); @@ -66,7 +80,48 @@ public class FunctionLoader { } } - return map.values(); + return map; + } + + /** + * Load functions that are defined by users. + * + * @param conf + * @param functionMap + * @return + * @throws IOException + */ + public static Map<FunctionSignature, FunctionDesc> loadUserDefinedFunctions(TajoConf conf, + Map<FunctionSignature, FunctionDesc> functionMap) + throws IOException { + + String[] codePaths = conf.getStrings(TajoConf.ConfVars.PYTHON_CODE_DIR.varname); + if (codePaths != null) { + FileSystem localFS = FileSystem.getLocal(conf); + for (String codePathStr : codePaths) { + Path codePath = new Path(codePathStr); + List<Path> filePaths = TUtil.newList(); + if (localFS.isDirectory(codePath)) { + for (FileStatus file : localFS.listStatus(codePath, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().endsWith(PythonScriptEngine.FILE_EXTENSION); + } + })) { + filePaths.add(file.getPath()); + } + } else { + filePaths.add(codePath); + } + for (Path filePath : filePaths) { + for (FunctionDesc f : PythonScriptEngine.registerFunctions(filePath.toUri(), + FunctionLoader.PYTHON_FUNCTION_NAMESPACE)) { + functionMap.put(f.getSignature(), f); + } + } + } + } + return functionMap; } public static Set<FunctionDesc> findScalarFunctions() { http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java index cec1862..a73478f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java @@ -63,7 +63,7 @@ public class Projector { public void init() { for (EvalNode eval : evals) { - eval.bind(inSchema); + eval.bind(context.getEvalContext(), inSchema); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java index e2fd47f..e55a258 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java @@ -51,7 +51,7 @@ public class GlobalPlanEqualityTester implements GlobalPlanRewriteRule { LogicalNode node = eb.getPlan(); if (node != null) { PlanProto.LogicalNodeTree tree = LogicalNodeSerializer.serialize(node); - LogicalNode deserialize = LogicalNodeDeserializer.deserialize(plan.getContext(), tree); + LogicalNode deserialize = LogicalNodeDeserializer.deserialize(plan.getContext(), null, tree); assert node.deepEquals(deserialize); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java index 6d9e38a..4b53b39 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java @@ -63,7 +63,7 @@ public abstract class AggregationExec extends UnaryPhysicalExec { public void init() throws IOException { super.init(); for (EvalNode aggFunction : aggFunctions) { - aggFunction.bind(inSchema); + aggFunction.bind(context.getEvalContext(), inSchema); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index be6c046..806d34c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -72,7 +72,7 @@ public class BSTIndexScanExec extends PhysicalExec { super.init(); progress = 0.0f; if (qual != null) { - qual.bind(inSchema); + qual.bind(context.getEvalContext(), inSchema); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java index 0781041..2535edf 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java @@ -54,7 +54,7 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec { public void init() throws IOException { super.init(); if (hasJoinQual) { - joinQual.bind(inSchema); + joinQual.bind(context.getEvalContext(), inSchema); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java index 37bc5a7..94429a0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java @@ -250,7 +250,7 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { } for (AggregationFunctionCallEval eachFunction: aggFunctions) { - eachFunction.bind(inSchema); + eachFunction.bind(context.getEvalContext(), inSchema); eachFunction.setFirstPhase(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java index e96e750..0f25d6c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java @@ -375,7 +375,7 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { } for (EvalNode aggFunction : aggFunctions) { - aggFunction.bind(schema); + aggFunction.bind(context.getEvalContext(), schema); } tupleSize = groupingKeyIds.length + aggFunctionsNum; http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java index 7b01a9b..b394390 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java @@ -119,7 +119,7 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { nonDistinctAggrFunctions = eachGroupby.getAggFunctions(); if (nonDistinctAggrFunctions != null) { for (AggregationFunctionCallEval eachFunction: nonDistinctAggrFunctions) { - eachFunction.bind(inSchema); + eachFunction.bind(context.getEvalContext(), inSchema); eachFunction.setIntermediatePhase(); } nonDistinctAggrContexts = new FunctionContext[nonDistinctAggrFunctions.length]; http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java index 7bd71e2..e71976c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java @@ -252,7 +252,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { aggrFunctions = groupbyNode.getAggFunctions(); if (aggrFunctions != null) { for (AggregationFunctionCallEval eachFunction: aggrFunctions) { - eachFunction.bind(inSchema); + eachFunction.bind(context.getEvalContext(), inSchema); eachFunction.setFinalPhase(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java index 32ec772..4581b4a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java @@ -41,7 +41,7 @@ public class EvalExprExec extends PhysicalExec { super.init(); progress = 0.0f; for (Target target : plan.getTargets()) { - target.getEvalTree().bind(inSchema); + target.getEvalTree().bind(context.getEvalContext(), inSchema); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java index fa9ba94..6f573d0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java @@ -121,9 +121,9 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec { rightNumCols = rightChild.getSchema().size(); - joinQual.bind(inSchema); + joinQual.bind(context.getEvalContext(), inSchema); if (joinFilter != null) { - joinFilter.bind(inSchema); + joinFilter.bind(context.getEvalContext(), inSchema); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java index b71c770..6897e92 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java @@ -39,7 +39,7 @@ public class HavingExec extends UnaryPhysicalExec { @Override public void init() throws IOException { super.init(); - qual.bind(inSchema); + qual.bind(context.getEvalContext(), inSchema); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java index c090fa7..7f5bbe9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java @@ -39,7 +39,7 @@ public class SelectionExec extends UnaryPhysicalExec { @Override public void init() throws IOException { super.init(); - qual.bind(inSchema); + qual.bind(context.getEvalContext(), inSchema); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 671555c..ff9477f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -109,7 +109,7 @@ public class SeqScanExec extends ScanExec { FieldEval targetExpr = new FieldEval(column); Datum datum = NullDatum.get(); if (partitionRow != null) { - targetExpr.bind(columnPartitionSchema); + targetExpr.bind(context.getEvalContext(), columnPartitionSchema); datum = targetExpr.eval(partitionRow); } ConstEval constExpr = new ConstEval(datum); @@ -163,7 +163,7 @@ public class SeqScanExec extends ScanExec { super.init(); if (plan.hasQual()) { - qual.bind(inSchema); + qual.bind(context.getEvalContext(), inSchema); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java index 2f1fc46..05b0418 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java @@ -183,7 +183,7 @@ public class WindowAggExec extends UnaryPhysicalExec { public void init() throws IOException { super.init(); for (EvalNode functionEval : functions) { - functionEval.bind(inSchema); + functionEval.bind(context.getEvalContext(), inSchema); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java index 7b3c00d..ee50221 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java @@ -25,8 +25,8 @@ import org.apache.tajo.QueryVars; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.session.Session; import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.session.Session; import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto; http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 08403ff..074f34e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -44,7 +44,6 @@ import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.master.TajoMaster.MasterContext; import org.apache.tajo.master.exec.DDLExecutor; import org.apache.tajo.master.exec.QueryExecutor; -import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager; import org.apache.tajo.session.Session; import org.apache.tajo.plan.*; import org.apache.tajo.plan.logical.InsertNode; @@ -77,7 +76,6 @@ public class GlobalEngine extends AbstractService { private LogicalPlanner planner; private LogicalOptimizer optimizer; private LogicalPlanVerifier annotatedPlanVerifier; - private DistributedQueryHookManager hookManager; private QueryExecutor queryExecutor; private DDLExecutor ddlExecutor; http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 51f82f8..0a5de58 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -21,10 +21,7 @@ package org.apache.tajo.master; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.service.CompositeService; @@ -36,10 +33,12 @@ import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tajo.catalog.CatalogServer; import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.catalog.LocalCatalogWrapper; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.function.FunctionLoader; +import org.apache.tajo.function.FunctionSignature; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.master.rm.WorkerResourceManager; import org.apache.tajo.metrics.CatalogMetricsGaugeSet; @@ -68,7 +67,9 @@ import java.lang.management.ThreadMXBean; import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Map; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; @@ -177,7 +178,7 @@ public class TajoMaster extends CompositeService { diagnoseTajoMaster(); this.storeManager = StorageManager.getFileStorageManager(systemConf); - catalogServer = new CatalogServer(FunctionLoader.load()); + catalogServer = new CatalogServer(loadFunctions()); addIfService(catalogServer); catalog = new LocalCatalogWrapper(catalogServer, systemConf); @@ -207,6 +208,11 @@ public class TajoMaster extends CompositeService { LOG.info("Tajo Master is initialized."); } + private Collection<FunctionDesc> loadFunctions() throws IOException { + Map<FunctionSignature, FunctionDesc> functionMap = FunctionLoader.load(); + return FunctionLoader.loadUserDefinedFunctions(systemConf, functionMap).values(); + } + private void initSystemMetrics() { systemMetrics = new TajoSystemMetrics(systemConf, METRICS_GROUP_NAME, getMasterName()); systemMetrics.start(); http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 2eb3c5f..ad1a8e3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -47,6 +47,10 @@ import org.apache.tajo.master.*; import org.apache.tajo.master.exec.prehook.CreateTableHook; import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager; import org.apache.tajo.master.exec.prehook.InsertIntoHook; +import org.apache.tajo.plan.expr.EvalContext; +import org.apache.tajo.plan.expr.GeneralFunctionEval; +import org.apache.tajo.plan.function.python.PythonScriptEngine; +import org.apache.tajo.plan.function.python.TajoScriptEngine; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.querymaster.*; import org.apache.tajo.session.Session; @@ -113,11 +117,9 @@ public class QueryExecutor { } else if (PlannerUtil.checkIfSimpleQuery(plan)) { execSimpleQuery(queryContext, session, sql, plan, response); - // NonFromQuery indicates a form of 'select a, x+y;' } else if (PlannerUtil.checkIfNonFromQuery(plan)) { - execNonFromQuery(queryContext, session, sql, plan, response); - + execNonFromQuery(queryContext, plan, response); } else { // it requires distributed execution. So, the query is forwarded to a query master. executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response); @@ -263,37 +265,66 @@ public class QueryExecutor { response.setResultCode(ClientProtos.ResultCode.OK); } - public void execNonFromQuery(QueryContext queryContext, Session session, String query, - LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder) throws Exception { + public void execNonFromQuery(QueryContext queryContext, LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder) + throws Exception { LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + EvalContext evalContext = new EvalContext(); Target[] targets = plan.getRootBlock().getRawTargets(); if (targets == null) { throw new PlanningException("No targets"); } - final Tuple outTuple = new VTuple(targets.length); + try { + // start script executor + startScriptExecutors(queryContext, evalContext, targets); + final Tuple outTuple = new VTuple(targets.length); + for (int i = 0; i < targets.length; i++) { + EvalNode eval = targets[i].getEvalTree(); + eval.bind(evalContext, null); + outTuple.put(i, eval.eval(null)); + } + boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT; + if (isInsert) { + InsertNode insertNode = rootNode.getChild(); + insertNonFromQuery(queryContext, insertNode, responseBuilder); + } else { + Schema schema = PlannerUtil.targetToSchema(targets); + RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); + byte[] serializedBytes = encoder.toBytes(outTuple); + ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder(); + serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes)); + serializedResBuilder.setSchema(schema.getProto()); + serializedResBuilder.setBytesNum(serializedBytes.length); + + responseBuilder.setResultSet(serializedResBuilder); + responseBuilder.setMaxRowNum(1); + responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); + responseBuilder.setResultCode(ClientProtos.ResultCode.OK); + } + } finally { + // stop script executor + stopScriptExecutors(evalContext); + } + } + + public static void startScriptExecutors(QueryContext queryContext, EvalContext evalContext, Target[] targets) + throws IOException { for (int i = 0; i < targets.length; i++) { EvalNode eval = targets[i].getEvalTree(); - eval.bind(null); - outTuple.put(i, eval.eval(null)); + if (eval instanceof GeneralFunctionEval) { + GeneralFunctionEval functionEval = (GeneralFunctionEval) eval; + if (functionEval.getFuncDesc().getInvocation().hasPython()) { + TajoScriptEngine scriptExecutor = new PythonScriptEngine(functionEval.getFuncDesc()); + evalContext.addScriptEngine(eval, scriptExecutor); + scriptExecutor.start(queryContext.getConf()); + } + } } - boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT; - if (isInsert) { - InsertNode insertNode = rootNode.getChild(); - insertNonFromQuery(queryContext, insertNode, responseBuilder); - } else { - Schema schema = PlannerUtil.targetToSchema(targets); - RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); - byte[] serializedBytes = encoder.toBytes(outTuple); - ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder(); - serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes)); - serializedResBuilder.setSchema(schema.getProto()); - serializedResBuilder.setBytesNum(serializedBytes.length); - - responseBuilder.setResultSet(serializedResBuilder); - responseBuilder.setMaxRowNum(1); - responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - responseBuilder.setResultCode(ClientProtos.ResultCode.OK); + } + + public static void stopScriptExecutors(EvalContext evalContext) { + for (TajoScriptEngine executor : evalContext.getAllScriptEngines()) { + executor.shutdown(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 1d0293b..17af71a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -33,7 +33,10 @@ import org.apache.hadoop.yarn.util.RackResolver; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.CatalogClient; import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.function.FunctionLoader; +import org.apache.tajo.function.FunctionSignature; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.service.TajoMasterInfo; @@ -51,10 +54,7 @@ import org.apache.tajo.rule.SelfDiagnosisRuleEngine; import org.apache.tajo.rule.SelfDiagnosisRuleSession; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.JvmPauseMonitor; -import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.*; import org.apache.tajo.util.history.HistoryReader; import org.apache.tajo.util.history.HistoryWriter; import org.apache.tajo.util.metrics.TajoSystemMetrics; @@ -65,6 +65,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -235,6 +236,8 @@ public class TajoWorker extends CompositeService { taskHistoryWriter.init(conf); historyReader = new HistoryReader(workerContext.getWorkerName(), this.systemConf); + + FunctionLoader.loadUserDefinedFunctions(systemConf, new HashMap<FunctionSignature, FunctionDesc>()); diagnoseTajoWorker(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 61a05dc..a983f78 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -21,7 +21,7 @@ package org.apache.tajo.worker; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - +import io.netty.handler.codec.http.QueryStringDecoder; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,25 +30,26 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TajoProtos; import org.apache.tajo.TajoProtos.TaskAttemptState; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.plan.serder.LogicalNodeDeserializer; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.TaskRequest; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.*; import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.plan.function.python.TajoScriptEngine; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.serder.LogicalNodeDeserializer; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.rpc.NettyClientBase; @@ -57,8 +58,6 @@ import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.NetUtils; -import io.netty.handler.codec.http.QueryStringDecoder; - import java.io.File; import java.io.IOException; import java.net.InetAddress; @@ -135,7 +134,7 @@ public class Task { } public void initPlan() throws IOException { - plan = LogicalNodeDeserializer.deserialize(queryContext, request.getPlan()); + plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan()); LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); if (scanNode != null) { for (LogicalNode node : scanNode) { @@ -173,7 +172,7 @@ public class Task { LOG.info("=================================="); LOG.info("* Stage " + request.getId() + " is initialized"); LOG.info("* InterQuery: " + interQuery - + (interQuery ? ", Use " + this.shuffleType + " shuffle":"") + + + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") + ", Fragments (num: " + request.getFragments().size() + ")" + ", Fetches (total:" + request.getFetches().size() + ") :"); @@ -190,8 +189,21 @@ public class Task { LOG.info("=================================="); } + private void startScriptExecutors() throws IOException { + for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { + executor.start(systemConf); + } + } + + private void stopScriptExecutors() { + for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { + executor.shutdown(); + } + } + public void init() throws IOException { initPlan(); + startScriptExecutors(); if (context.getState() == TaskAttemptState.TA_PENDING) { // initialize a task temporal dir @@ -257,11 +269,13 @@ public class Task { } public void kill() { + stopScriptExecutors(); context.setState(TaskAttemptState.TA_KILLED); context.stop(); } public void abort() { + stopScriptExecutors(); context.stop(); } @@ -410,6 +424,7 @@ public class Task { } catch (Throwable e) { error = e ; LOG.error(e.getMessage(), e); + stopScriptExecutors(); context.stop(); } finally { if (executor != null) { @@ -487,6 +502,7 @@ public class Task { } executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory); + stopScriptExecutors(); } public TaskHistory createTaskHistory() { @@ -630,6 +646,7 @@ public class Task { if (retryNum == maxRetryNum) { LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); } + stopScriptExecutors(); context.stop(); // retry task ctx.getFetchLatch().countDown(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 706e9b8..58028ac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -32,6 +32,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.plan.expr.EvalContext; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.fragment.FileFragment; @@ -83,6 +84,8 @@ public class TaskAttemptContext { private Map<Integer, Long> partitionOutputVolume; private HashShuffleAppenderManager hashShuffleAppenderManager; + private EvalContext evalContext = new EvalContext(); + public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext executionBlockContext, final TaskAttemptId queryId, final FragmentProto[] fragments, @@ -403,4 +406,8 @@ public class TaskAttemptContext { public HashShuffleAppenderManager getHashShuffleAppenderManager() { return hashShuffleAppenderManager; } + + public EvalContext getEvalContext() { + return evalContext; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/resources/python/__init__.py ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/python/__init__.py b/tajo-core/src/main/resources/python/__init__.py new file mode 100644 index 0000000..8093a2f --- /dev/null +++ b/tajo-core/src/main/resources/python/__init__.py @@ -0,0 +1,17 @@ +#!/usr/bin/python + +############################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file
