Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/882297e7 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/882297e7 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/882297e7 Branch: refs/heads/index_support Commit: 882297e788e13e991a7445a576d3dd73846291e7 Parents: 4d7910a a745385 Author: Jihoon Son <[email protected]> Authored: Sat Apr 18 12:35:34 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Sat Apr 18 12:35:34 2015 +0900 ---------------------------------------------------------------------- .travis.yml | 2 +- CHANGES | 15 + tajo-catalog/pom.xml | 4 +- .../tajo/catalog/AbstractCatalogClient.java | 90 +- .../org/apache/tajo/catalog/CatalogUtil.java | 3 +- .../org/apache/tajo/catalog/FunctionDesc.java | 4 +- .../tajo/function/FunctionInvocation.java | 24 +- .../org/apache/tajo/function/FunctionUtil.java | 10 + .../tajo/function/PythonInvocationDesc.java | 98 ++ .../src/main/proto/CatalogProtos.proto | 6 + .../apache/tajo/catalog/TestFunctionDesc.java | 4 +- tajo-catalog/tajo-catalog-drivers/pom.xml | 34 +- .../tajo-catalog-drivers/tajo-hcatalog/pom.xml | 739 -------------- .../tajo/catalog/store/HCatalogStore.java | 993 ------------------- .../catalog/store/HCatalogStoreClientPool.java | 170 ---- .../apache/tajo/catalog/store/HCatalogUtil.java | 147 --- .../tajo/catalog/store/TestHCatalogStore.java | 467 --------- .../tajo-catalog-drivers/tajo-hive/pom.xml | 351 +++++++ .../tajo/catalog/store/HiveCatalogStore.java | 980 ++++++++++++++++++ .../store/HiveCatalogStoreClientPool.java | 170 ++++ .../tajo/catalog/store/HiveCatalogUtil.java | 127 +++ .../catalog/store/TestHiveCatalogStore.java | 504 ++++++++++ .../tajo/catalog/store/AbstractDBStore.java | 5 +- .../org/apache/tajo/catalog/TestCatalog.java | 10 +- .../org/apache/tajo/cli/tsql/SimpleParser.java | 2 +- .../tajo/client/CatalogAdminClientImpl.java | 68 +- .../org/apache/tajo/client/QueryClientImpl.java | 50 +- .../apache/tajo/client/SessionConnection.java | 40 +- .../main/java/org/apache/tajo/QueryVars.java | 2 +- .../java/org/apache/tajo/conf/TajoConf.java | 4 + .../java/org/apache/tajo/datum/AnyDatum.java | 82 ++ .../java/org/apache/tajo/datum/BlobDatum.java | 5 +- .../org/apache/tajo/datum/DatumFactory.java | 21 +- .../java/org/apache/tajo/json/DatumAdapter.java | 6 +- .../java/org/apache/tajo/storage/Tuple.java | 54 +- .../java/org/apache/tajo/util/FileUtil.java | 23 + .../java/org/apache/tajo/util/KeyValueSet.java | 2 +- .../tajo/util/datetime/DateTimeFormat.java | 2 - tajo-core/pom.xml | 212 ---- .../tajo/engine/codegen/EvalCodeGenContext.java | 6 +- .../codegen/LegacyFunctionBindingEmitter.java | 4 +- .../engine/codegen/VariablesPreBuilder.java | 2 +- .../tajo/engine/function/FunctionLoader.java | 59 +- .../tajo/engine/function/builtin/StdDev.java | 94 -- .../tajo/engine/function/builtin/StdDevPop.java | 10 +- .../engine/function/builtin/StdDevSamp.java | 8 +- .../tajo/engine/function/builtin/VarPop.java | 42 + .../engine/function/builtin/VarPopDouble.java | 39 + .../engine/function/builtin/VarPopFloat.java | 39 + .../tajo/engine/function/builtin/VarPopInt.java | 39 + .../engine/function/builtin/VarPopLong.java | 39 + .../tajo/engine/function/builtin/VarSamp.java | 40 + .../engine/function/builtin/VarSampDouble.java | 39 + .../engine/function/builtin/VarSampFloat.java | 39 + .../engine/function/builtin/VarSampInt.java | 39 + .../engine/function/builtin/VarSampLong.java | 39 + .../tajo/engine/function/builtin/Variance.java | 94 ++ .../apache/tajo/engine/parser/SQLAnalyzer.java | 10 + .../apache/tajo/engine/planner/Projector.java | 2 +- .../rules/GlobalPlanEqualityTester.java | 2 +- .../planner/physical/AggregationExec.java | 2 +- .../planner/physical/BSTIndexScanExec.java | 2 +- .../engine/planner/physical/CommonJoinExec.java | 2 +- .../DistinctGroupbyFirstAggregationExec.java | 2 +- .../DistinctGroupbyHashAggregationExec.java | 2 +- .../DistinctGroupbySecondAggregationExec.java | 2 +- .../DistinctGroupbyThirdAggregationExec.java | 2 +- .../engine/planner/physical/EvalExprExec.java | 2 +- .../planner/physical/HashLeftOuterJoinExec.java | 4 +- .../engine/planner/physical/HavingExec.java | 2 +- .../engine/planner/physical/SelectionExec.java | 2 +- .../engine/planner/physical/SeqScanExec.java | 4 +- .../engine/planner/physical/WindowAggExec.java | 2 +- .../apache/tajo/engine/query/QueryContext.java | 2 +- .../org/apache/tajo/master/GlobalEngine.java | 2 - .../org/apache/tajo/master/QueryInProgress.java | 6 +- .../apache/tajo/master/TajoContainerProxy.java | 38 +- .../java/org/apache/tajo/master/TajoMaster.java | 16 +- .../apache/tajo/master/exec/QueryExecutor.java | 100 +- .../apache/tajo/querymaster/QueryMaster.java | 24 +- .../tajo/worker/ExecutionBlockContext.java | 29 +- .../tajo/worker/TajoResourceAllocator.java | 20 +- .../java/org/apache/tajo/worker/TajoWorker.java | 11 +- .../main/java/org/apache/tajo/worker/Task.java | 123 ++- .../apache/tajo/worker/TaskAttemptContext.java | 7 + .../java/org/apache/tajo/worker/TaskRunner.java | 2 - .../tajo/worker/WorkerHeartbeatService.java | 10 +- .../ConnectivityCheckerRuleForTajoWorker.java | 26 +- tajo-core/src/main/proto/InternalTypes.proto | 2 +- tajo-core/src/main/resources/python/__init__.py | 17 + .../src/main/resources/python/controller.py | 330 ++++++ .../src/main/resources/python/tajo_util.py | 103 ++ .../src/main/resources/webapps/admin/query.jsp | 30 +- .../java/org/apache/tajo/QueryTestCaseBase.java | 4 +- .../org/apache/tajo/TajoTestingCluster.java | 31 +- .../org/apache/tajo/cli/tools/TestTajoDump.java | 4 +- .../org/apache/tajo/cli/tsql/TestTajoCli.java | 56 +- .../apache/tajo/engine/eval/ExprTestBase.java | 19 +- .../apache/tajo/engine/eval/TestEvalTree.java | 68 +- .../tajo/engine/eval/TestEvalTreeUtil.java | 12 +- .../engine/function/TestBuiltinFunctions.java | 123 +++ .../tajo/engine/function/TestMathFunctions.java | 10 +- .../engine/function/TestPythonFunctions.java | 44 + .../tajo/engine/query/TestAlterTablespace.java | 2 +- .../apache/tajo/engine/query/TestCTASQuery.java | 6 +- .../tajo/engine/query/TestCreateTable.java | 14 +- .../tajo/engine/query/TestGroupByQuery.java | 14 + .../tajo/engine/query/TestInsertQuery.java | 40 +- .../apache/tajo/engine/query/TestNetTypes.java | 26 +- .../tajo/engine/query/TestSelectQuery.java | 39 +- .../apache/tajo/engine/query/TestSortQuery.java | 6 +- .../tajo/engine/query/TestTablePartitions.java | 22 +- .../org/apache/tajo/jdbc/TestResultSet.java | 4 +- .../tajo/jdbc/TestTajoDatabaseMetaData.java | 22 +- .../java/org/apache/tajo/jdbc/TestTajoJdbc.java | 12 +- tajo-core/src/test/resources/python/__init__.py | 17 + .../src/test/resources/python/test_funcs.py | 33 + .../src/test/resources/python/test_funcs.pyc | Bin 0 -> 1042 bytes .../src/test/resources/python/test_funcs2.py | 32 + .../create_table_various_types_for_hcatalog.sql | 50 - ...ate_table_various_types_for_hive_catalog.sql | 50 + .../testGroupbyWithPythonFunc.sql | 1 + .../testGroupbyWithPythonFunc2.sql | 1 + .../testNestedPythonFunction.sql | 1 + .../TestSelectQuery/testSelectPythonFuncs.sql | 2 + .../testSelectWithParentheses1.sql | 1 + .../testSelectWithParentheses2.sql | 1 + .../testSelectWithPredicateOnPythonFunc.sql | 1 + .../testGroupbyWithPythonFunc.result | 7 + .../testGroupbyWithPythonFunc2.result | 7 + .../testNestedPythonFunction.result | 7 + .../testSelectPythonFuncs.result | 7 + .../testSelectWithParentheses1.result | 3 + .../testSelectWithParentheses2.result | 3 + .../testSelectWithPredicateOnPythonFunc.result | 17 + tajo-dist/pom.xml | 4 +- tajo-dist/src/main/bin/tajo | 3 - .../src/main/conf/catalog-site.xml.template | 6 +- tajo-dist/src/main/conf/tajo-env.sh | 2 +- .../configuration/catalog_configuration.rst | 40 +- tajo-docs/src/main/sphinx/functions.rst | 70 +- .../src/main/sphinx/functions/json_func.rst | 1 + .../src/main/sphinx/hcatalog_integration.rst | 52 - tajo-docs/src/main/sphinx/hive_integration.rst | 42 + tajo-docs/src/main/sphinx/index.rst | 2 +- .../org/apache/tajo/plan/ExprAnnotator.java | 26 +- .../org/apache/tajo/plan/LogicalPlanner.java | 9 +- .../apache/tajo/plan/expr/AlgebraicUtil.java | 6 +- .../tajo/plan/expr/BetweenPredicateEval.java | 28 +- .../org/apache/tajo/plan/expr/CastEval.java | 21 +- .../org/apache/tajo/plan/expr/EvalContext.java | 45 + .../org/apache/tajo/plan/expr/EvalNode.java | 5 +- .../org/apache/tajo/plan/expr/EvalTreeUtil.java | 4 +- .../org/apache/tajo/plan/expr/FieldEval.java | 4 +- .../org/apache/tajo/plan/expr/FunctionEval.java | 4 +- .../tajo/plan/expr/GeneralFunctionEval.java | 56 +- .../plan/expr/PatternMatchPredicateEval.java | 4 +- .../exprrewrite/EvalTreeOptimizationRule.java | 3 +- .../plan/exprrewrite/EvalTreeOptimizer.java | 1 + .../plan/exprrewrite/rules/ConstantFolding.java | 33 +- .../tajo/plan/function/FunctionInvoke.java | 90 ++ .../plan/function/FunctionInvokeContext.java | 74 ++ .../function/LegacyScalarFunctionInvoke.java | 81 ++ .../plan/function/PythonFunctionInvoke.java | 59 ++ .../function/python/PythonScriptEngine.java | 368 +++++++ .../plan/function/python/TajoScriptEngine.java | 83 ++ .../tajo/plan/function/stream/BufferPool.java | 74 ++ .../function/stream/ByteBufInputChannel.java | 71 ++ .../plan/function/stream/ByteBufLineReader.java | 176 ++++ .../function/stream/CSVLineDeserializer.java | 99 ++ .../tajo/plan/function/stream/CSVLineSerDe.java | 42 + .../plan/function/stream/CSVLineSerializer.java | 118 +++ .../stream/FieldSerializerDeserializer.java | 36 + .../function/stream/FieldSplitProcessor.java | 34 + .../tajo/plan/function/stream/InputHandler.java | 78 ++ .../function/stream/LineSplitProcessor.java | 45 + .../plan/function/stream/OutputHandler.java | 156 +++ .../plan/function/stream/StreamingUtil.java | 91 ++ .../stream/TextFieldSerializerDeserializer.java | 257 +++++ .../function/stream/TextLineDeserializer.java | 60 ++ .../function/stream/TextLineParsingError.java | 31 + .../plan/function/stream/TextLineSerDe.java | 65 ++ .../function/stream/TextLineSerializer.java | 45 + .../plan/rewrite/rules/FilterPushDownRule.java | 2 +- .../rules/LogicalPlanEqualityTester.java | 2 +- .../rewrite/rules/PartitionedTableRewriter.java | 2 +- .../tajo/plan/serder/EvalNodeDeserializer.java | 16 +- .../tajo/plan/serder/EvalNodeSerializer.java | 10 +- .../plan/serder/LogicalNodeDeserializer.java | 143 +-- tajo-plan/src/main/proto/Plan.proto | 28 +- tajo-project/pom.xml | 1 + .../main/java/org/apache/tajo/rpc/RpcUtils.java | 34 - .../org/apache/tajo/rpc/AsyncRpcClient.java | 58 +- .../org/apache/tajo/rpc/AsyncRpcServer.java | 82 +- .../org/apache/tajo/rpc/BlockingRpcClient.java | 88 +- .../org/apache/tajo/rpc/BlockingRpcServer.java | 85 +- .../tajo/rpc/ConnectionCloseFutureListener.java | 35 + .../org/apache/tajo/rpc/NettyClientBase.java | 124 +-- .../tajo/rpc/ProtoChannelInitializer.java | 11 +- .../org/apache/tajo/rpc/RpcClientManager.java | 185 ++++ .../org/apache/tajo/rpc/RpcConnectionPool.java | 191 ---- .../org/apache/tajo/rpc/ServerCallable.java | 36 +- .../java/org/apache/tajo/rpc/TestAsyncRpc.java | 72 +- .../org/apache/tajo/rpc/TestBlockingRpc.java | 85 +- .../apache/tajo/rpc/TestRpcClientManager.java | 97 ++ .../org/apache/tajo/storage/TestLazyTuple.java | 2 +- .../testErrorTolerance1.json | 12 +- .../dataset/TestJsonSerDe/testVariousType.json | 2 +- 208 files changed, 7771 insertions(+), 4298 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java ---------------------------------------------------------------------- diff --cc tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index 3b72639,49be29a..c967b9d --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@@ -373,27 -372,9 +373,27 @@@ public abstract class AbstractCatalogCl } @Override + public List<IndexDescProto> getAllIndexes() { + try { - return new ServerCallable<List<IndexDescProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { ++ return new ServerCallable<List<IndexDescProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { + + @Override + public List<IndexDescProto> call(NettyClientBase client) throws Exception { + CatalogProtocolService.BlockingInterface stub = getStub(client); + GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO); + return response.getIndexList(); + } + }.withRetries(); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); + return null; + } + } + + @Override public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) { try { - return new ServerCallable<PartitionMethodDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public PartitionMethodDesc call(NettyClientBase client) throws ServiceException { TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); @@@ -637,40 -618,17 +637,40 @@@ } @Override - public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) { + public boolean existIndexByColumns(final String databaseName, final String tableName, final Column [] columns) { + return existIndexByColumnNames(databaseName, tableName, extractColumnNames(columns)); + } + + @Override + public boolean existIndexByColumnNames(final String databaseName, final String tableName, final String [] columnNames) { try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { - GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); + GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder(); builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); - builder.setColumnName(columnName); + for (String colunName : columnNames) { + builder.addColumnNames(colunName); + } CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.existIndexByColumn(null, builder.build()).getValue(); + return stub.existIndexByColumnNames(null, builder.build()).getValue(); + } + }.withRetries(); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); + return false; + } + } + + @Override + public boolean existIndexesByTable(final String databaseName, final String tableName) { + try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { ++ return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { + public Boolean call(NettyClientBase client) throws ServiceException { + + CatalogProtocolService.BlockingInterface stub = getStub(client); + return stub.existIndexesByTable(null, CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue(); } }.withRetries(); } catch (ServiceException e) { @@@ -699,60 -657,20 +699,60 @@@ } } + private static String[] extractColumnNames(Column[] columns) { + String[] columnNames = new String [columns.length]; + for (int i = 0; i < columnNames.length; i++) { + columnNames[i] = columns[i].getSimpleName(); + } + return columnNames; + } + + @Override + public final IndexDesc getIndexByColumns(final String databaseName, + final String tableName, + final Column [] columns) { + return getIndexByColumnNames(databaseName, tableName, extractColumnNames(columns)); + } + @Override - public final IndexDesc getIndexByColumn(final String databaseName, - final String tableName, - final String columnName) { + public final IndexDesc getIndexByColumnNames(final String databaseName, + final String tableName, + final String [] columnNames) { try { - return new ServerCallable<IndexDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public IndexDesc call(NettyClientBase client) throws ServiceException { - GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); + GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder(); builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); - builder.setColumnName(columnName); + for (String columnName : columnNames) { + builder.addColumnNames(columnName); + } CatalogProtocolService.BlockingInterface stub = getStub(client); - return new IndexDesc(stub.getIndexByColumn(null, builder.build())); + return new IndexDesc(stub.getIndexByColumnNames(null, builder.build())); + } + }.withRetries(); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); + return null; + } + } + + @Override + public final Collection<IndexDesc> getAllIndexesByTable(final String databaseName, + final String tableName) { + try { - return new ServerCallable<Collection<IndexDesc>>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { ++ return new ServerCallable<Collection<IndexDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { + @Override + public Collection<IndexDesc> call(NettyClientBase client) throws Exception { + TableIdentifierProto proto = CatalogUtil.buildTableIdentifier(databaseName, tableName); + CatalogProtocolService.BlockingInterface stub = getStub(client); + GetAllIndexesResponse response = stub.getAllIndexesByTable(null, proto); + List<IndexDesc> indexDescs = TUtil.newList(); + for (IndexDescProto descProto : response.getIndexDescList()) { + indexDescs.add(new IndexDesc(descProto)); + } + return indexDescs; } }.withRetries(); } catch (ServiceException e) { @@@ -783,9 -701,27 +783,9 @@@ } @Override - public List<IndexProto> getAllIndexes() { - try { - return new ServerCallable<List<IndexProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - - @Override - public List<IndexProto> call(NettyClientBase client) throws Exception { - CatalogProtocolService.BlockingInterface stub = getStub(client); - GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO); - return response.getIndexList(); - } - }.withRetries(); - } catch (ServiceException e) { - LOG.error(e.getMessage(), e); - return null; - } - } - - @Override public final boolean createFunction(final FunctionDesc funcDesc) { try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.createFunction(null, funcDesc.getProto()).getValue(); http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java ---------------------------------------------------------------------- diff --cc tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index 3a6014d,dcfad8d..0965bc8 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@@ -24,9 -24,7 +24,8 @@@ import org.apache.tajo.DataTypeUtil import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; - import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.common.TajoDataTypes.DataType; http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java ---------------------------------------------------------------------- diff --cc tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java index 0000000,5b1a996..700b327 mode 000000,100644..100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java @@@ -1,0 -1,964 +1,980 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.tajo.catalog.store; + + import com.google.common.collect.Lists; + + import org.apache.commons.lang.StringEscapeUtils; + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hive.metastore.TableType; + import org.apache.hadoop.hive.metastore.api.*; + import org.apache.hadoop.hive.serde.serdeConstants; + import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; + import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; + import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; + import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; + import org.apache.tajo.TajoConstants; + import org.apache.tajo.catalog.*; + import org.apache.tajo.catalog.exception.*; + import org.apache.tajo.catalog.partition.PartitionMethodDesc; + import org.apache.tajo.catalog.proto.CatalogProtos; + import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; + import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; -import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto; ++import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto; + import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto; + import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto; + import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto; + import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; + import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto; + import org.apache.tajo.catalog.statistics.TableStats; + import org.apache.tajo.common.TajoDataTypes; + import org.apache.tajo.common.exception.NotImplementedException; + import org.apache.tajo.conf.TajoConf; + import org.apache.tajo.exception.InternalException; + import org.apache.tajo.storage.StorageConstants; + import org.apache.tajo.util.KeyValueSet; + import org.apache.thrift.TException; + + import java.io.IOException; + import java.util.*; + + import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType; + + public class HiveCatalogStore extends CatalogConstants implements CatalogStore { + protected final Log LOG = LogFactory.getLog(getClass()); + + private static String HIVE_WAREHOUSE_DIR_CONF_KEY = "hive.metastore.warehouse.dir"; + + protected Configuration conf; + private static final int CLIENT_POOL_SIZE = 2; + private final HiveCatalogStoreClientPool clientPool; + private final String defaultTableSpaceUri; + + public HiveCatalogStore(final Configuration conf) throws InternalException { + if (!(conf instanceof TajoConf)) { + throw new CatalogException("Invalid Configuration Type:" + conf.getClass().getSimpleName()); + } + this.conf = conf; + this.defaultTableSpaceUri = TajoConf.getWarehouseDir((TajoConf) conf).toString(); + this.clientPool = new HiveCatalogStoreClientPool(CLIENT_POOL_SIZE, conf); + } + + @Override + public boolean existTable(final String databaseName, final String tableName) throws CatalogException { + boolean exist = false; + org.apache.hadoop.hive.ql.metadata.Table table; + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + // get table + try { + client = clientPool.getClient(); + table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName); + if (table != null) { + exist = true; + } + } catch (NoSuchObjectException nsoe) { + exist = false; + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + + return exist; + } + + @Override + public final CatalogProtos.TableDescProto getTable(String databaseName, final String tableName) throws CatalogException { + org.apache.hadoop.hive.ql.metadata.Table table = null; + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + Path path = null; + CatalogProtos.StoreType storeType = null; + org.apache.tajo.catalog.Schema schema = null; + KeyValueSet options = null; + TableStats stats = null; + PartitionMethodDesc partitions = null; + + ////////////////////////////////// + // set tajo table schema. + ////////////////////////////////// + try { + // get hive table schema + try { + client = clientPool.getClient(); + table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName); + path = table.getPath(); + } catch (NoSuchObjectException nsoe) { + throw new CatalogException("Table not found. - tableName:" + tableName, nsoe); + } catch (Exception e) { + throw new CatalogException(e); + } + + // convert HiveCatalogStore field schema into tajo field schema. + schema = new org.apache.tajo.catalog.Schema(); + + List<FieldSchema> fieldSchemaList = table.getCols(); + boolean isPartitionKey = false; + for (FieldSchema eachField : fieldSchemaList) { + isPartitionKey = false; + + if (table.getPartitionKeys() != null) { + for (FieldSchema partitionKey : table.getPartitionKeys()) { + if (partitionKey.getName().equals(eachField.getName())) { + isPartitionKey = true; + } + } + } + + if (!isPartitionKey) { + String fieldName = databaseName + CatalogConstants.IDENTIFIER_DELIMITER + tableName + + CatalogConstants.IDENTIFIER_DELIMITER + eachField.getName(); + TajoDataTypes.Type dataType = HiveCatalogUtil.getTajoFieldType(eachField.getType().toString()); + schema.addColumn(fieldName, dataType); + } + } + + // validate field schema. + HiveCatalogUtil.validateSchema(table); + + stats = new TableStats(); + options = new KeyValueSet(); + options.putAll(table.getParameters()); + options.remove("EXTERNAL"); + + Properties properties = table.getMetadata(); + if (properties != null) { + // set field delimiter + String fieldDelimiter = "", nullFormat = ""; + if (properties.getProperty(serdeConstants.FIELD_DELIM) != null) { + fieldDelimiter = properties.getProperty(serdeConstants.FIELD_DELIM); + } else { + // if hive table used default row format delimiter, Properties doesn't have it. + // So, Tajo must set as follows: + fieldDelimiter = "\u0001"; + } + + // set null format + if (properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT) != null) { + nullFormat = properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT); + } else { + nullFormat = "\\N"; + } + options.remove(serdeConstants.SERIALIZATION_NULL_FORMAT); + + // set file output format + String fileOutputformat = properties.getProperty(hive_metastoreConstants.FILE_OUTPUT_FORMAT); + storeType = CatalogUtil.getStoreType(HiveCatalogUtil.getStoreType(fileOutputformat)); + + if (storeType.equals(CatalogProtos.StoreType.TEXTFILE)) { + options.set(StorageConstants.TEXT_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter)); + options.set(StorageConstants.TEXT_NULL, StringEscapeUtils.escapeJava(nullFormat)); + } else if (storeType.equals(CatalogProtos.StoreType.RCFILE)) { + options.set(StorageConstants.RCFILE_NULL, StringEscapeUtils.escapeJava(nullFormat)); + String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB); + if (LazyBinaryColumnarSerDe.class.getName().equals(serde)) { + options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); + } else if (ColumnarSerDe.class.getName().equals(serde)) { + options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); + } + } else if (storeType.equals(CatalogProtos.StoreType.SEQUENCEFILE) ) { + options.set(StorageConstants.SEQUENCEFILE_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter)); + options.set(StorageConstants.SEQUENCEFILE_NULL, StringEscapeUtils.escapeJava(nullFormat)); + String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB); + if (LazyBinarySerDe.class.getName().equals(serde)) { + options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); + } else if (LazySimpleSerDe.class.getName().equals(serde)) { + options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); + } + } + + // set data size + long totalSize = 0; + if (properties.getProperty("totalSize") != null) { + totalSize = Long.parseLong(properties.getProperty("totalSize")); + } else { + try { + FileSystem fs = path.getFileSystem(conf); + if (fs.exists(path)) { + totalSize = fs.getContentSummary(path).getLength(); + } + } catch (IOException ioe) { + throw new CatalogException("Fail to get path. - path:" + path.toString(), ioe); + } + } + stats.setNumBytes(totalSize); + } + + // set partition keys + List<FieldSchema> partitionKeys = table.getPartitionKeys(); + + if (null != partitionKeys) { + org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema(); + StringBuilder sb = new StringBuilder(); + if (partitionKeys.size() > 0) { + for (int i = 0; i < partitionKeys.size(); i++) { + FieldSchema fieldSchema = partitionKeys.get(i); + TajoDataTypes.Type dataType = HiveCatalogUtil.getTajoFieldType(fieldSchema.getType().toString()); + String fieldName = databaseName + CatalogConstants.IDENTIFIER_DELIMITER + tableName + + CatalogConstants.IDENTIFIER_DELIMITER + fieldSchema.getName(); + expressionSchema.addColumn(new Column(fieldName, dataType)); + if (i > 0) { + sb.append(","); + } + sb.append(fieldSchema.getName()); + } + partitions = new PartitionMethodDesc( + databaseName, + tableName, + PartitionType.COLUMN, + sb.toString(), + expressionSchema); + } + } + } finally { + if(client != null) client.release(); + } + TableMeta meta = new TableMeta(storeType, options); + TableDesc tableDesc = new TableDesc(databaseName + "." + tableName, schema, meta, path.toUri()); + if (table.getTableType().equals(TableType.EXTERNAL_TABLE)) { + tableDesc.setExternal(true); + } + if (stats != null) { + tableDesc.setStats(stats); + } + if (partitions != null) { + tableDesc.setPartitionMethod(partitions); + } + return tableDesc.getProto(); + } + + + private TajoDataTypes.Type getDataType(final String typeStr) { + try { + return Enum.valueOf(TajoDataTypes.Type.class, typeStr); + } catch (IllegalArgumentException iae) { + LOG.error("Cannot find a matched type against from '" + typeStr + "'"); + return null; + } + } + + @Override + public final List<String> getAllTableNames(String databaseName) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + return client.getHiveClient().getAllTables(databaseName); + } catch (TException e) { + throw new CatalogException(e); + } finally { + if(client != null) client.release(); + } + } + + @Override + public void createTablespace(String spaceName, String spaceUri) throws CatalogException { + // SKIP + } + + @Override + public boolean existTablespace(String spaceName) throws CatalogException { + // SKIP + return spaceName.equals(TajoConstants.DEFAULT_TABLESPACE_NAME); + } + + @Override + public void dropTablespace(String spaceName) throws CatalogException { + // SKIP + } + + @Override + public Collection<String> getAllTablespaceNames() throws CatalogException { + return Lists.newArrayList(TajoConstants.DEFAULT_TABLESPACE_NAME); + } + + @Override + public TablespaceProto getTablespace(String spaceName) throws CatalogException { + if (spaceName.equals(TajoConstants.DEFAULT_TABLESPACE_NAME)) { + TablespaceProto.Builder builder = TablespaceProto.newBuilder(); + builder.setSpaceName(TajoConstants.DEFAULT_TABLESPACE_NAME); + builder.setUri(defaultTableSpaceUri); + return builder.build(); + } else { + throw new CatalogException("tablespace concept is not supported in HiveCatalogStore"); + } + } + + @Override + public void updateTableStats(CatalogProtos.UpdateTableStatsProto statsProto) throws + CatalogException { + // TODO - not implemented yet + } + + @Override + public void alterTablespace(CatalogProtos.AlterTablespaceProto alterProto) throws CatalogException { + throw new CatalogException("tablespace concept is not supported in HiveCatalogStore"); + } + + @Override + public void createDatabase(String databaseName, String tablespaceName) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + Database database = new Database( + databaseName, + "", + defaultTableSpaceUri + "/" + databaseName, + new HashMap<String, String>()); + client = clientPool.getClient(); + client.getHiveClient().createDatabase(database); + } catch (AlreadyExistsException e) { + throw new AlreadyExistsDatabaseException(databaseName); + } catch (Throwable t) { + throw new CatalogException(t); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public boolean existDatabase(String databaseName) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + List<String> databaseNames = client.getHiveClient().getAllDatabases(); + return databaseNames.contains(databaseName); + } catch (Throwable t) { + throw new CatalogException(t); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public void dropDatabase(String databaseName) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + client.getHiveClient().dropDatabase(databaseName); + } catch (NoSuchObjectException e) { + throw new NoSuchDatabaseException(databaseName); + } catch (Throwable t) { + throw new CatalogException(databaseName); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public Collection<String> getAllDatabaseNames() throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + return client.getHiveClient().getAllDatabases(); + } catch (TException e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public final void createTable(final CatalogProtos.TableDescProto tableDescProto) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + TableDesc tableDesc = new TableDesc(tableDescProto); + String[] splitted = CatalogUtil.splitFQTableName(tableDesc.getName()); + String databaseName = splitted[0]; + String tableName = splitted[1]; + + try { + client = clientPool.getClient(); + + org.apache.hadoop.hive.metastore.api.Table table = new org.apache.hadoop.hive.metastore.api.Table(); + table.setDbName(databaseName); + table.setTableName(tableName); + table.setParameters(new HashMap<String, String>(tableDesc.getMeta().getOptions().getAllKeyValus())); + // TODO: set owner + //table.setOwner(); + + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setParameters(new HashMap<String, String>()); + sd.getSerdeInfo().setName(table.getTableName()); + + // if tajo set location method, thrift client make exception as follows: + // Caused by: MetaException(message:java.lang.NullPointerException) + // If you want to modify table path, you have to modify on Hive cli. + if (tableDesc.isExternal()) { + table.setTableType(TableType.EXTERNAL_TABLE.name()); + table.putToParameters("EXTERNAL", "TRUE"); + + Path tablePath = new Path(tableDesc.getPath()); + FileSystem fs = tablePath.getFileSystem(conf); + if (fs.isFile(tablePath)) { + LOG.warn("A table path is a file, but HiveCatalogStore does not allow a file path."); + sd.setLocation(tablePath.getParent().toString()); + } else { + sd.setLocation(tablePath.toString()); + } + } + + // set column information + List<Column> columns = tableDesc.getSchema().getColumns(); + ArrayList<FieldSchema> cols = new ArrayList<FieldSchema>(columns.size()); + + for (Column eachField : columns) { + cols.add(new FieldSchema(eachField.getSimpleName(), + HiveCatalogUtil.getHiveFieldType(eachField.getDataType()), "")); + } + sd.setCols(cols); + + // set partition keys + if (tableDesc.hasPartition() && tableDesc.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN)) { + List<FieldSchema> partitionKeys = new ArrayList<FieldSchema>(); + for (Column eachPartitionKey : tableDesc.getPartitionMethod().getExpressionSchema().getColumns()) { + partitionKeys.add(new FieldSchema(eachPartitionKey.getSimpleName(), + HiveCatalogUtil.getHiveFieldType(eachPartitionKey.getDataType()), "")); + } + table.setPartitionKeys(partitionKeys); + } + + if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.RCFILE)) { + String serde = tableDesc.getMeta().getOption(StorageConstants.RCFILE_SERDE); + sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName()); + sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName()); + if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) { + sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName()); + } else { + sd.getSerdeInfo().setSerializationLib( + org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe.class.getName()); + } + + if (tableDesc.getMeta().getOptions().containsKey(StorageConstants.RCFILE_NULL)) { + table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT, + StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.RCFILE_NULL))); + } + } else if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.CSV) + || tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.TEXTFILE)) { + sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); + sd.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class.getName()); + sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName()); + + String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.TEXT_DELIMITER, + StorageConstants.DEFAULT_FIELD_DELIMITER); + + // User can use an unicode for filed delimiter such as \u0001, \001. + // In this case, java console will convert this value into "\\u001". + // And hive will un-espace this value again. + // As a result, user can use right field delimiter. + // So, we have to un-escape this value. + sd.getSerdeInfo().putToParameters(serdeConstants.SERIALIZATION_FORMAT, + StringEscapeUtils.unescapeJava(fieldDelimiter)); + sd.getSerdeInfo().putToParameters(serdeConstants.FIELD_DELIM, + StringEscapeUtils.unescapeJava(fieldDelimiter)); + table.getParameters().remove(StorageConstants.TEXT_DELIMITER); + + if (tableDesc.getMeta().containsOption(StorageConstants.TEXT_NULL)) { + table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT, + StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.TEXT_NULL))); + table.getParameters().remove(StorageConstants.TEXT_NULL); + } + } else if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.SEQUENCEFILE)) { + String serde = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_SERDE); + sd.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName()); + sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat.class.getName()); + + if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) { + sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); + + String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_DELIMITER, + StorageConstants.DEFAULT_FIELD_DELIMITER); + + // User can use an unicode for filed delimiter such as \u0001, \001. + // In this case, java console will convert this value into "\\u001". + // And hive will un-espace this value again. + // As a result, user can use right field delimiter. + // So, we have to un-escape this value. + sd.getSerdeInfo().putToParameters(serdeConstants.SERIALIZATION_FORMAT, + StringEscapeUtils.unescapeJava(fieldDelimiter)); + sd.getSerdeInfo().putToParameters(serdeConstants.FIELD_DELIM, + StringEscapeUtils.unescapeJava(fieldDelimiter)); + table.getParameters().remove(StorageConstants.SEQUENCEFILE_DELIMITER); + } else { + sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.class.getName()); + } + + if (tableDesc.getMeta().containsOption(StorageConstants.SEQUENCEFILE_NULL)) { + table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT, + StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_NULL))); + table.getParameters().remove(StorageConstants.SEQUENCEFILE_NULL); + } + } else { + if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.PARQUET)) { + sd.setInputFormat(parquet.hive.DeprecatedParquetInputFormat.class.getName()); + sd.setOutputFormat(parquet.hive.DeprecatedParquetOutputFormat.class.getName()); + sd.getSerdeInfo().setSerializationLib(parquet.hive.serde.ParquetHiveSerDe.class.getName()); + } else { + throw new CatalogException(new NotImplementedException(tableDesc.getMeta().getStoreType + ().name())); + } + } + + sd.setSortCols(new ArrayList<Order>()); + + table.setSd(sd); + client.getHiveClient().createTable(table); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if(client != null) client.release(); + } + } + + @Override + public final void dropTable(String databaseName, final String tableName) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + client.getHiveClient().dropTable(databaseName, tableName, false, false); + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + + @Override + public void alterTable(final CatalogProtos.AlterTableDescProto alterTableDescProto) throws CatalogException { + final String[] split = CatalogUtil.splitFQTableName(alterTableDescProto.getTableName()); + + if (split.length == 1) { + throw new IllegalArgumentException("alterTable() requires a qualified table name, but it is \"" + + alterTableDescProto.getTableName() + "\"."); + } + + final String databaseName = split[0]; + final String tableName = split[1]; + String partitionName = null; + CatalogProtos.PartitionDescProto partitionDesc = null; + + switch (alterTableDescProto.getAlterTableType()) { + case RENAME_TABLE: + if (existTable(databaseName,alterTableDescProto.getNewTableName().toLowerCase())) { + throw new AlreadyExistsTableException(alterTableDescProto.getNewTableName()); + } + renameTable(databaseName, tableName, alterTableDescProto.getNewTableName().toLowerCase()); + break; + case RENAME_COLUMN: + if (existColumn(databaseName,tableName, alterTableDescProto.getAlterColumnName().getNewColumnName())) { + throw new ColumnNameAlreadyExistException(alterTableDescProto.getAlterColumnName().getNewColumnName()); + } + renameColumn(databaseName, tableName, alterTableDescProto.getAlterColumnName()); + break; + case ADD_COLUMN: + if (existColumn(databaseName,tableName, alterTableDescProto.getAddColumn().getName())) { + throw new ColumnNameAlreadyExistException(alterTableDescProto.getAddColumn().getName()); + } + addNewColumn(databaseName, tableName, alterTableDescProto.getAddColumn()); + break; + case ADD_PARTITION: + partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); + partitionDesc = getPartition(databaseName, tableName, partitionName); + if(partitionDesc != null) { + throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName); + } + addPartition(databaseName, tableName, alterTableDescProto.getPartitionDesc()); + break; + case DROP_PARTITION: + partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); + partitionDesc = getPartition(databaseName, tableName, partitionName); + if(partitionDesc == null) { + throw new NoSuchPartitionException(databaseName, tableName, partitionName); + } + dropPartition(databaseName, tableName, partitionDesc); + break; + case SET_PROPERTY: + // TODO - not implemented yet + break; + default: + //TODO + } + } + + + private void renameTable(String databaseName, String tableName, String newTableName) { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + try { + client = clientPool.getClient(); + Table newTable = client.getHiveClient().getTable(databaseName, tableName); + newTable.setTableName(newTableName); + client.getHiveClient().alter_table(databaseName, tableName, newTable); + + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + private void renameColumn(String databaseName, String tableName, CatalogProtos.AlterColumnProto alterColumnProto) { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + Table table = client.getHiveClient().getTable(databaseName, tableName); + List<FieldSchema> columns = table.getSd().getCols(); + + for (final FieldSchema currentColumn : columns) { + if (currentColumn.getName().equalsIgnoreCase(alterColumnProto.getOldColumnName())) { + currentColumn.setName(alterColumnProto.getNewColumnName()); + } + } + client.getHiveClient().alter_table(databaseName, tableName, table); + + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + + private void addNewColumn(String databaseName, String tableName, CatalogProtos.ColumnProto columnProto) { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + Table table = client.getHiveClient().getTable(databaseName, tableName); + List<FieldSchema> columns = table.getSd().getCols(); + columns.add(new FieldSchema(columnProto.getName(), + HiveCatalogUtil.getHiveFieldType(columnProto.getDataType()), "")); + client.getHiveClient().alter_table(databaseName, tableName, table); + + + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + private void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto + partitionDescProto) { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + + Partition partition = new Partition(); + partition.setDbName(databaseName); + partition.setTableName(tableName); + + List<String> values = Lists.newArrayList(); + for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) { + values.add(keyProto.getPartitionValue()); + } + partition.setValues(values); + + Table table = client.getHiveClient().getTable(databaseName, tableName); + StorageDescriptor sd = table.getSd(); + sd.setLocation(partitionDescProto.getPath()); + partition.setSd(sd); + + client.getHiveClient().add_partition(partition); + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + private void dropPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto + partitionDescProto) { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + + List<String> values = Lists.newArrayList(); + for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) { + values.add(keyProto.getPartitionValue()); + } + client.getHiveClient().dropPartition(databaseName, tableName, values, true); + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws CatalogException { + // TODO - not implemented yet ++ throw new UnsupportedOperationException(); + } + + @Override + public CatalogProtos.PartitionMethodProto getPartitionMethod(String databaseName, String tableName) + throws CatalogException { - return null; // TODO - not implemented yet ++ // TODO - not implemented yet ++ throw new UnsupportedOperationException(); + } + + @Override + public boolean existPartitionMethod(String databaseName, String tableName) throws CatalogException { - return false; // TODO - not implemented yet ++ // TODO - not implemented yet ++ throw new UnsupportedOperationException(); + } + + @Override + public void dropPartitionMethod(String databaseName, String tableName) throws CatalogException { + // TODO - not implemented yet ++ throw new UnsupportedOperationException(); + } + + @Override + public List<CatalogProtos.PartitionDescProto> getPartitions(String databaseName, + String tableName) throws CatalogException { + throw new UnsupportedOperationException(); + } + + + @Override + public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, + String partitionName) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + CatalogProtos.PartitionDescProto.Builder builder = null; + + try { + client = clientPool.getClient(); + + Partition partition = client.getHiveClient().getPartition(databaseName, tableName, partitionName); + builder = CatalogProtos.PartitionDescProto.newBuilder(); + builder.setPartitionName(partitionName); + builder.setPath(partition.getSd().getLocation()); + + String[] partitionNames = partitionName.split("/"); + + for (int i = 0; i < partition.getValues().size(); i++) { + String value = partition.getValues().get(i); + CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder(); + + String columnName = partitionNames[i].split("=")[0]; + keyBuilder.setColumnName(columnName); + keyBuilder.setPartitionValue(value); + builder.addPartitionKeys(keyBuilder); + } + } catch (NoSuchObjectException e) { + return null; + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + return builder.build(); + } + + @Override + public final void addFunction(final FunctionDesc func) throws CatalogException { + // TODO - not implemented yet ++ throw new UnsupportedOperationException(); + } + + @Override + public final void deleteFunction(final FunctionDesc func) throws CatalogException { + // TODO - not implemented yet ++ throw new UnsupportedOperationException(); + } + + @Override + public final void existFunction(final FunctionDesc func) throws CatalogException { + // TODO - not implemented yet ++ throw new UnsupportedOperationException(); + } + + @Override + public final List<String> getAllFunctionNames() throws CatalogException { + // TODO - not implemented yet - return null; ++ throw new UnsupportedOperationException(); + } + + @Override - public void dropIndex(String databaseName, String indexName) throws CatalogException { ++ public void createIndex(CatalogProtos.IndexDescProto proto) throws CatalogException { + // TODO - not implemented yet ++ throw new UnsupportedOperationException(); + } + + @Override - public boolean existIndexByName(String databaseName, String indexName) throws CatalogException { ++ public void dropIndex(String databaseName, String indexName) throws CatalogException { + // TODO - not implemented yet - return false; ++ throw new UnsupportedOperationException(); + } + + @Override - public CatalogProtos.IndexDescProto[] getIndexes(String databaseName, String tableName) throws CatalogException { ++ public CatalogProtos.IndexDescProto getIndexByName(String databaseName, String indexName) throws CatalogException { + // TODO - not implemented yet - return null; ++ throw new UnsupportedOperationException(); + } + + @Override - public void createIndex(CatalogProtos.IndexDescProto proto) throws CatalogException { ++ public CatalogProtos.IndexDescProto getIndexByColumns(String databaseName, String tableName, String[] columnNames) ++ throws CatalogException { + // TODO - not implemented yet ++ throw new UnsupportedOperationException(); + } + + @Override - public CatalogProtos.IndexDescProto getIndexByName(String databaseName, String indexName) throws CatalogException { ++ public boolean existIndexByName(String databaseName, String indexName) throws CatalogException { + // TODO - not implemented yet - return null; ++ throw new UnsupportedOperationException(); + } + + @Override - public CatalogProtos.IndexDescProto getIndexByColumn(String databaseName, String tableName, String columnName) ++ public boolean existIndexByColumns(String databaseName, String tableName, String[] columnNames) + throws CatalogException { + // TODO - not implemented yet - return null; ++ throw new UnsupportedOperationException(); + } + + @Override - public boolean existIndexByColumn(String databaseName, String tableName, String columnName) throws CatalogException { ++ public List<String> getAllIndexNamesByTable(String databaseName, String tableName) throws CatalogException { + // TODO - not implemented yet - return false; ++ throw new UnsupportedOperationException(); ++ } ++ ++ @Override ++ public boolean existIndexesByTable(String databaseName, String tableName) throws CatalogException { ++ // TODO - not implemented yet ++ throw new UnsupportedOperationException(); + } + + @Override + public final void close() { + clientPool.close(); + } + + private boolean existColumn(final String databaseName ,final String tableName , final String columnName) throws CatalogException { + boolean exist = false; + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + + client = clientPool.getClient(); + Table table = client.getHiveClient().getTable(databaseName, tableName); + List<FieldSchema> columns = table.getSd().getCols(); + + for (final FieldSchema currentColumn : columns) { + if (currentColumn.getName().equalsIgnoreCase(columnName)) { + exist = true; + } + } + client.getHiveClient().alter_table(databaseName, tableName, table); + + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + + return exist; + } + + @Override + public List<ColumnProto> getAllColumns() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<DatabaseProto> getAllDatabases() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override - public List<IndexProto> getAllIndexes() throws CatalogException { ++ public List<IndexDescProto> getAllIndexes() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<TablePartitionProto> getAllPartitions() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<TableOptionProto> getAllTableOptions() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<TableStatsProto> getAllTableStats() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<TableDescriptorProto> getAllTables() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<TablespaceProto> getTablespaces() throws CatalogException { + throw new UnsupportedOperationException(); + } + } http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java ---------------------------------------------------------------------- diff --cc tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java index 90a51b6,9d0e427..5fa1c67 --- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java @@@ -259,124 -260,6 +259,124 @@@ public class CatalogAdminClientImpl imp } @Override + public IndexDescProto getIndex(final String indexName) throws ServiceException { - return new ServerCallable<IndexDescProto>(connection.connPool, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { ++ return new ServerCallable<IndexDescProto>(connection.manager, ++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { + + @Override + public IndexDescProto call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.getIndexWithName(null, + connection.convertSessionedString(indexName)); + } + }.withRetries(); + } + + @Override + public boolean existIndex(final String indexName) throws ServiceException { - return new ServerCallable<Boolean>(connection.connPool, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { ++ return new ServerCallable<Boolean>(connection.manager, ++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { + + @Override + public Boolean call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.existIndexWithName(null, + connection.convertSessionedString(indexName)).getValue(); + } + }.withRetries(); + } + + @Override + public List<IndexDescProto> getIndexes(final String tableName) throws ServiceException { - return new ServerCallable<List<IndexDescProto>>(connection.connPool, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { ++ return new ServerCallable<List<IndexDescProto>>(connection.manager, ++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { + + @Override + public List<IndexDescProto> call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + GetIndexesResponse response = tajoMasterService.getIndexesForTable(null, + connection.convertSessionedString(tableName)); + if (response.getResult().getResultCode() == ResultCode.OK) { + return response.getIndexesList(); + } else { + throw new SQLException(response.getResult().getErrorMessage()); + } + } + }.withRetries(); + } + + @Override + public boolean hasIndexes(final String tableName) throws ServiceException { - return new ServerCallable<Boolean>(connection.connPool, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { ++ return new ServerCallable<Boolean>(connection.manager, ++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { + + @Override + public Boolean call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.existIndexesForTable(null, + connection.convertSessionedString(tableName)).getValue(); + } + }.withRetries(); + } + + @Override + public IndexDescProto getIndex(final String tableName, final String[] columnNames) throws ServiceException { - return new ServerCallable<IndexDescProto>(connection.connPool, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { ++ return new ServerCallable<IndexDescProto>(connection.manager, ++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { + + @Override + public IndexDescProto call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setTableName(tableName); + for (String eachColumnName : columnNames) { + builder.addColumnNames(eachColumnName); + } + GetIndexWithColumnsResponse response = tajoMasterService.getIndexWithColumns(null, builder.build()); + if (response.getResult().getResultCode() == ResultCode.OK) { + return response.getIndexDesc(); + } else { + throw new SQLException(response.getResult().getErrorMessage()); + } + } + }.withRetries(); + } + + @Override + public boolean existIndex(final String tableName, final String[] columnName) throws ServiceException { - return new ServerCallable<Boolean>(connection.connPool, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { ++ return new ServerCallable<Boolean>(connection.manager, ++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { + + @Override + public Boolean call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setTableName(tableName); + for (String eachColumnName : columnName) { + builder.addColumnNames(eachColumnName); + } + return tajoMasterService.existIndexWithColumns(null, builder.build()).getValue(); + } + }.withRetries(); + } + + @Override + public boolean dropIndex(final String indexName) throws ServiceException { - return new ServerCallable<Boolean>(connection.connPool, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { ++ return new ServerCallable<Boolean>(connection.manager, ++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { + + @Override + public Boolean call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.dropIndex(null, + connection.convertSessionedString(indexName)).getValue(); + } + }.withRetries(); + } + + @Override public void close() throws IOException { } } http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 02ba3cd,ad1a8e3..5f30fdd --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@@ -50,8 -47,11 +50,12 @@@ import org.apache.tajo.master.* import org.apache.tajo.master.exec.prehook.CreateTableHook; import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager; import org.apache.tajo.master.exec.prehook.InsertIntoHook; + import org.apache.tajo.plan.expr.EvalContext; + import org.apache.tajo.plan.expr.GeneralFunctionEval; + import org.apache.tajo.plan.function.python.PythonScriptEngine; + import org.apache.tajo.plan.function.python.TajoScriptEngine; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.querymaster.*; import org.apache.tajo.session.Session; import org.apache.tajo.plan.LogicalPlan; @@@ -272,40 -262,69 +274,88 @@@ public class QueryExecutor response.setQueryId(queryInfo.getQueryId().getProto()); response.setMaxRowNum(maxRow); response.setTableDesc(desc.getProto()); - response.setResultCode(ClientProtos.ResultCode.OK); + response.setResult(IPCUtil.buildOkRequestResult()); } - public void execNonFromQuery(QueryContext queryContext, Session session, String query, - LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder) throws Exception { + public void execNonFromQuery(QueryContext queryContext, LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder) + throws Exception { LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + EvalContext evalContext = new EvalContext(); Target[] targets = plan.getRootBlock().getRawTargets(); if (targets == null) { throw new PlanningException("No targets"); } - final Tuple outTuple = new VTuple(targets.length); + try { + // start script executor + startScriptExecutors(queryContext, evalContext, targets); + final Tuple outTuple = new VTuple(targets.length); + for (int i = 0; i < targets.length; i++) { + EvalNode eval = targets[i].getEvalTree(); + eval.bind(evalContext, null); + outTuple.put(i, eval.eval(null)); + } + boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT; + if (isInsert) { + InsertNode insertNode = rootNode.getChild(); + insertNonFromQuery(queryContext, insertNode, responseBuilder); + } else { + Schema schema = PlannerUtil.targetToSchema(targets); + RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); + byte[] serializedBytes = encoder.toBytes(outTuple); + ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder(); + serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes)); + serializedResBuilder.setSchema(schema.getProto()); + serializedResBuilder.setBytesNum(serializedBytes.length); + + responseBuilder.setResultSet(serializedResBuilder); + responseBuilder.setMaxRowNum(1); + responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - responseBuilder.setResultCode(ClientProtos.ResultCode.OK); ++ responseBuilder.setResult(IPCUtil.buildOkRequestResult()); + } + } finally { + // stop script executor + stopScriptExecutors(evalContext); + } + } + + public static void startScriptExecutors(QueryContext queryContext, EvalContext evalContext, Target[] targets) + throws IOException { for (int i = 0; i < targets.length; i++) { EvalNode eval = targets[i].getEvalTree(); - eval.bind(null); - outTuple.put(i, eval.eval(null)); + if (eval instanceof GeneralFunctionEval) { + GeneralFunctionEval functionEval = (GeneralFunctionEval) eval; + if (functionEval.getFuncDesc().getInvocation().hasPython()) { + TajoScriptEngine scriptExecutor = new PythonScriptEngine(functionEval.getFuncDesc()); + evalContext.addScriptEngine(eval, scriptExecutor); + scriptExecutor.start(queryContext.getConf()); + } + } } - boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT; - if (isInsert) { - InsertNode insertNode = rootNode.getChild(); - insertNonFromQuery(queryContext, insertNode, responseBuilder); - } else { - Schema schema = PlannerUtil.targetToSchema(targets); - RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); - byte[] serializedBytes = encoder.toBytes(outTuple); - ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder(); - serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes)); - serializedResBuilder.setSchema(schema.getProto()); - serializedResBuilder.setBytesNum(serializedBytes.length); - - responseBuilder.setResultSet(serializedResBuilder); - responseBuilder.setMaxRowNum(1); - responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - responseBuilder.setResult(IPCUtil.buildOkRequestResult()); ++//<<<<<<< HEAD ++// boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT; ++// if (isInsert) { ++// InsertNode insertNode = rootNode.getChild(); ++// insertNonFromQuery(queryContext, insertNode, responseBuilder); ++// } else { ++// Schema schema = PlannerUtil.targetToSchema(targets); ++// RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); ++// byte[] serializedBytes = encoder.toBytes(outTuple); ++// ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder(); ++// serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes)); ++// serializedResBuilder.setSchema(schema.getProto()); ++// serializedResBuilder.setBytesNum(serializedBytes.length); ++// ++// responseBuilder.setResultSet(serializedResBuilder); ++// responseBuilder.setMaxRowNum(1); ++// responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); ++// responseBuilder.setResult(IPCUtil.buildOkRequestResult()); ++//======= + } + + public static void stopScriptExecutors(EvalContext evalContext) { + for (TajoScriptEngine executor : evalContext.getAllScriptEngines()) { + executor.shutdown(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 7d95f8b,a983f78..5544d53 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@@ -135,10 -134,22 +134,25 @@@ public class Task } public void initPlan() throws IOException { - plan = LogicalNodeDeserializer.deserialize(queryContext, request.getPlan()); + plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan()); + updateDescsForScanNodes(NodeType.SCAN); + updateDescsForScanNodes(NodeType.PARTITIONS_SCAN); + updateDescsForScanNodes(NodeType.INDEX_SCAN); + LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); + if (scanNode != null) { + for (LogicalNode node : scanNode) { + ScanNode scan = (ScanNode) node; + descs.put(scan.getCanonicalName(), scan.getTableDesc()); + } + } + + LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN); + if (partitionScanNode != null) { + for (LogicalNode node : partitionScanNode) { + PartitionedTableScanNode scan = (PartitionedTableScanNode) node; + descs.put(scan.getCanonicalName(), scan.getTableDesc()); + } + } interQuery = request.getProto().getInterQuery(); if (interQuery) { @@@ -178,19 -189,21 +192,32 @@@ LOG.info("=================================="); } + private void updateDescsForScanNodes(NodeType nodeType) { + assert nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.INDEX_SCAN; + LogicalNode[] scanNodes = PlannerUtil.findAllNodes(plan, nodeType); + if (scanNodes != null) { + for (LogicalNode node : scanNodes) { + ScanNode scanNode = (ScanNode) node; + descs.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); + } + } + } + + private void startScriptExecutors() throws IOException { + for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { + executor.start(systemConf); + } + } + + private void stopScriptExecutors() { + for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { + executor.shutdown(); + } + } + public void init() throws IOException { initPlan(); + startScriptExecutors(); if (context.getState() == TaskAttemptState.TA_PENDING) { // initialize a task temporal dir http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java ---------------------------------------------------------------------- diff --cc tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java index c0b9deb,36ffd0c..6872ca5 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java @@@ -36,8 -36,10 +36,11 @@@ import org.apache.tajo.engine.codegen.T import org.apache.tajo.engine.function.FunctionLoader; import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.query.QueryContext; + import org.apache.tajo.function.FunctionSignature; + import org.apache.tajo.master.exec.QueryExecutor; import org.apache.tajo.plan.*; + import org.apache.tajo.plan.expr.EvalContext; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.serder.EvalNodeDeserializer; import org.apache.tajo.plan.serder.EvalNodeSerializer; http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java ---------------------------------------------------------------------- diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java index 35e7a91,43a8618..89efdc8 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java @@@ -45,11 -45,10 +45,11 @@@ public class LogicalPlanEqualityTester } @Override - public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException { + public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws PlanningException { + LogicalPlan plan = context.getPlan(); LogicalNode root = plan.getRootBlock().getRoot(); PlanProto.LogicalNodeTree serialized = LogicalNodeSerializer.serialize(plan.getRootBlock().getRoot()); - LogicalNode deserialized = LogicalNodeDeserializer.deserialize(context.getQueryContext(), serialized); - LogicalNode deserialized = LogicalNodeDeserializer.deserialize(queryContext, null, serialized); ++ LogicalNode deserialized = LogicalNodeDeserializer.deserialize(context.getQueryContext(), null, serialized); assert root.deepEquals(deserialized); return plan; } http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java ----------------------------------------------------------------------
