TAJO-1131: Supports Inserting or Creating table into the HBase mapped table.
Closes #232 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/69373878 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/69373878 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/69373878 Branch: refs/heads/hbase_storage Commit: 69373878b5b722ab82add0f62303019022302a73 Parents: 03c3ea2 Author: HyoungJun Kim <[email protected]> Authored: Sun Nov 16 10:12:17 2014 +0900 Committer: HyoungJun Kim <[email protected]> Committed: Sun Nov 16 10:12:17 2014 +0900 ---------------------------------------------------------------------- BUILDING | 2 + CHANGES | 4 + .../org/apache/tajo/catalog/CatalogUtil.java | 2 + .../src/main/proto/CatalogProtos.proto | 4 +- .../java/org/apache/tajo/cli/tsql/TajoCli.java | 2 +- .../main/java/org/apache/tajo/QueryVars.java | 58 + .../java/org/apache/tajo/TajoConstants.java | 1 + .../java/org/apache/tajo/conf/TajoConf.java | 5 + tajo-core/pom.xml | 27 + .../org/apache/tajo/engine/parser/SQLParser.g4 | 10 +- .../tajo/engine/function/string/ToCharLong.java | 55 + .../apache/tajo/engine/parser/SQLAnalyzer.java | 18 +- .../engine/planner/PhysicalPlannerImpl.java | 13 +- .../engine/planner/global/ExecutionBlock.java | 2 + .../planner/physical/ColPartitionStoreExec.java | 1 + .../planner/physical/ExternalSortExec.java | 4 +- .../physical/RangeShuffleFileWriteExec.java | 2 + .../engine/planner/physical/SeqScanExec.java | 6 +- .../engine/planner/physical/StoreTableExec.java | 37 +- .../apache/tajo/engine/query/QueryContext.java | 45 +- .../org/apache/tajo/engine/utils/TupleUtil.java | 7 +- .../DefaultFragmentScheduleAlgorithm.java | 2 +- .../org/apache/tajo/master/GlobalEngine.java | 33 +- .../master/GreedyFragmentScheduleAlgorithm.java | 9 +- .../apache/tajo/master/LazyTaskScheduler.java | 18 +- .../master/NonForwardQueryResultScanner.java | 21 +- .../apache/tajo/master/querymaster/Query.java | 353 +---- .../master/querymaster/QueryMasterTask.java | 40 +- .../tajo/master/querymaster/QueryUnit.java | 11 +- .../tajo/master/querymaster/Repartitioner.java | 71 +- .../tajo/master/querymaster/SubQuery.java | 16 +- .../main/java/org/apache/tajo/worker/Task.java | 44 +- .../main/resources/webapps/worker/queryunit.jsp | 5 +- .../org/apache/tajo/HBaseTestClusterUtil.java | 182 +++ .../java/org/apache/tajo/QueryTestCaseBase.java | 8 +- .../org/apache/tajo/TajoTestingCluster.java | 12 +- .../TestStringOperatorsAndFunctions.java | 1 + .../tajo/engine/planner/TestPlannerUtil.java | 2 +- .../planner/physical/TestBSTIndexExec.java | 3 +- .../planner/physical/TestPhysicalPlanner.java | 3 +- .../tajo/engine/query/TestHBaseTable.java | 1474 ++++++++++++++++++ .../tajo/worker/TestRangeRetrieverHandler.java | 4 +- .../dataset/TestHBaseTable/splits.data | 4 + .../TestHBaseTable/testBinaryMappedQuery.result | 81 + .../results/TestHBaseTable/testCATS.result | 100 ++ .../testColumnKeyValueSelectQuery.result | 12 + .../TestHBaseTable/testIndexPredication.result | 38 + .../TestHBaseTable/testInsertInto.result | 3 + .../testInsertIntoBinaryMultiRegion.result | 100 ++ .../testInsertIntoColumnKeyValue.result | 21 + .../testInsertIntoMultiRegion.result | 100 ++ .../testInsertIntoMultiRegion2.result | 100 ++ ...stInsertIntoMultiRegionMultiRowFields.result | 100 ++ ...estInsertIntoMultiRegionWithSplitFile.result | 100 ++ .../testInsertIntoRowField.result | 4 + .../testInsertIntoUsingPut.result | 3 + .../results/TestHBaseTable/testJoin.result | 7 + .../TestHBaseTable/testNonForwardQuery.result | 102 ++ .../testRowFieldSelectQuery.result | 88 ++ .../TestHBaseTable/testSimpleSelectQuery.result | 88 ++ tajo-dist/src/main/bin/tajo | 15 + tajo-dist/src/main/conf/tajo-env.sh | 3 + .../org/apache/tajo/plan/LogicalOptimizer.java | 7 + .../org/apache/tajo/plan/logical/SortNode.java | 20 +- .../rewrite/rules/PartitionedTableRewriter.java | 2 +- .../org/apache/tajo/plan/util/PlannerUtil.java | 112 +- .../plan/verifier/PreLogicalPlanVerifier.java | 3 + tajo-project/pom.xml | 1 + tajo-storage/pom.xml | 23 + .../java/org/apache/tajo/storage/CSVFile.java | 10 +- .../org/apache/tajo/storage/FileAppender.java | 28 +- .../apache/tajo/storage/FileStorageManager.java | 237 ++- .../storage/HashShuffleAppenderManager.java | 1 - .../org/apache/tajo/storage/MergeScanner.java | 8 +- .../java/org/apache/tajo/storage/RawFile.java | 7 +- .../java/org/apache/tajo/storage/RowFile.java | 6 +- .../org/apache/tajo/storage/StorageManager.java | 787 +++++++++- .../apache/tajo/storage/StorageProperty.java | 40 + .../apache/tajo/storage/avro/AvroAppender.java | 8 +- .../tajo/storage/fragment/FileFragment.java | 6 +- .../storage/fragment/FragmentConvertor.java | 25 +- .../storage/hbase/AbstractHBaseAppender.java | 223 +++ .../storage/hbase/AddSortForInsertRewriter.java | 87 ++ .../tajo/storage/hbase/ColumnMapping.java | 236 +++ .../HBaseBinarySerializerDeserializer.java | 97 ++ .../tajo/storage/hbase/HBaseFragment.java | 198 +++ .../tajo/storage/hbase/HBasePutAppender.java | 120 ++ .../apache/tajo/storage/hbase/HBaseScanner.java | 445 ++++++ .../storage/hbase/HBaseStorageConstants.java | 33 + .../tajo/storage/hbase/HBaseStorageManager.java | 1126 +++++++++++++ .../hbase/HBaseTextSerializerDeserializer.java | 71 + .../tajo/storage/hbase/HFileAppender.java | 167 ++ .../tajo/storage/hbase/IndexPredication.java | 61 + .../tajo/storage/hbase/RowKeyMapping.java | 40 + .../tajo/storage/parquet/ParquetAppender.java | 9 +- .../org/apache/tajo/storage/rcfile/RCFile.java | 6 +- .../sequencefile/SequenceFileAppender.java | 6 +- .../tajo/storage/text/DelimitedTextFile.java | 7 +- .../tajo/storage/trevni/TrevniAppender.java | 6 +- .../src/main/proto/StorageFragmentProtos.proto | 35 + .../src/main/resources/storage-default.xml | 25 +- .../tajo/storage/hbase/TestColumnMapping.java | 95 ++ .../storage/hbase/TestHBaseStorageManager.java | 109 ++ 103 files changed, 7530 insertions(+), 689 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/BUILDING ---------------------------------------------------------------------- diff --git a/BUILDING b/BUILDING index 974fac8..f318f88 100644 --- a/BUILDING +++ b/BUILDING @@ -44,6 +44,8 @@ Maven build goals: * Use -Dtar to create a TAR with the distribution (using -Pdist) * Use -Dhadoop.version to build with the specific hadoop version (-Dhadoop.version=2.5.1) * Currently, 2.3.0 or higher are supported. + * Use -Dhbase.version to build with the specific hbase version (-Dhbase.version=0.98.7-hadoop2) + * Currently, 0.98.x-hadoop2 or higher are tested. Tests options: * Use -DskipTests to skip tests when running the following Maven goals: http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index e37d44b..2b723d4 100644 --- a/CHANGES +++ b/CHANGES @@ -5,6 +5,10 @@ Release 0.9.1 - unreleased NEW FEATURES + TAJO-1131: Supports Inserting or Creating table into + the HBase mapped table.(Hyoungjun Kim) + + TAJO-1026: Implement Query history persistency manager.(Hyoungjun Kim) TAJO-233: Support PostgreSQL CatalogStore. (Jihun Kang via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index 6e66d2a..66417df 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -280,6 +280,8 @@ public class CatalogUtil { return StoreType.AVRO; } else if (typeStr.equalsIgnoreCase(StoreType.TEXTFILE.name())) { return StoreType.TEXTFILE; + } else if (typeStr.equalsIgnoreCase(StoreType.HBASE.name())) { + return StoreType.HBASE; } else { return null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index 31a7446..9c475aa 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -37,6 +37,7 @@ enum StoreType { SEQUENCEFILE = 8; AVRO = 9; TEXTFILE = 10; + HBASE = 11; } enum OrderType { @@ -69,7 +70,8 @@ message SchemaProto { message FragmentProto { required string id = 1; - required bytes contents = 2; + required string storeType = 2; + required bytes contents = 3; } message FileFragmentProto { http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index 34e2170..63ff873 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -25,13 +25,13 @@ import jline.UnsupportedTerminal; import jline.console.ConsoleReader; import org.apache.commons.cli.*; import org.apache.tajo.*; +import org.apache.tajo.ipc.*; import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.cli.tsql.commands.*; import org.apache.tajo.client.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.util.FileUtil; import java.io.*; http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-common/src/main/java/org/apache/tajo/QueryVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryVars.java b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java new file mode 100644 index 0000000..ba76d63 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo; + +import org.apache.tajo.validation.Validator; + +public enum QueryVars implements ConfigKey { + COMMAND_TYPE, + STAGING_DIR, + OUTPUT_TABLE_NAME, + OUTPUT_TABLE_PATH, + OUTPUT_PARTITIONS, + OUTPUT_OVERWRITE, + OUTPUT_AS_DIRECTORY, + OUTPUT_PER_FILE_SIZE, + ; + + QueryVars() { + } + + @Override + public String keyname() { + return name().toLowerCase(); + } + + @Override + public ConfigType type() { + return ConfigType.QUERY; + } + + @Override + public Class<?> valueClass() { + return null; + } + + @Override + public Validator validator() { + return null; + } +} + + http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java index 08909b4..de09f09 100644 --- a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java @@ -39,6 +39,7 @@ public class TajoConstants { public static final String SYSTEM_HA_BACKUP_DIR_NAME = "backup"; public static final int UNKNOWN_ROW_NUMBER = -1; + public static final int UNKNOWN_LENGTH = -1; private TajoConstants() {} } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 3966410..d96bcdd 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -302,6 +302,11 @@ public class TajoConf extends Configuration { HISTORY_EXPIRY_TIME_DAY("tajo.history.expiry-time-day", 7), // Misc ------------------------------------------------------------------- + // Fragment + // When making physical plan, the length of fragment is used to determine the physical operation. + // Some storage does not know the size of the fragment. + // In this case PhysicalPlanner uses this value to determine. + FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH("tajo.fragment.alternative.unknown.length", (long)(512 * 1024 * 1024)), // Geo IP GEOIP_DATA("tajo.function.geoip-database-location", ""), http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml index fce96e4..b58ae89 100644 --- a/tajo-core/pom.xml +++ b/tajo-core/pom.xml @@ -396,6 +396,33 @@ <artifactId>gmetric4j</artifactId> <version>1.0.3</version> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop-compat</artifactId> + <version>${hbase.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop2-compat</artifactId> + <version>${hbase.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 index beba248..9b63a24 100644 --- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 +++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 @@ -89,11 +89,11 @@ if_exists ; create_table_statement - : CREATE EXTERNAL TABLE (if_not_exists)? table_name table_elements USING file_type=identifier - (param_clause)? (table_partitioning_clauses)? (LOCATION path=Character_String_Literal) - | CREATE TABLE (if_not_exists)? table_name table_elements (USING file_type=identifier)? + : CREATE EXTERNAL TABLE (if_not_exists)? table_name table_elements USING storage_type=identifier + (param_clause)? (table_partitioning_clauses)? (LOCATION path=Character_String_Literal)? + | CREATE TABLE (if_not_exists)? table_name table_elements (USING storage_type=identifier)? (param_clause)? (table_partitioning_clauses)? (AS query_expression)? - | CREATE TABLE (if_not_exists)? table_name (USING file_type=identifier)? + | CREATE TABLE (if_not_exists)? table_name (USING storage_type=identifier)? (param_clause)? (table_partitioning_clauses)? AS query_expression | CREATE TABLE (if_not_exists)? table_name LIKE like_table_name=table_name ; @@ -1559,7 +1559,7 @@ null_ordering insert_statement : INSERT (OVERWRITE)? INTO table_name (LEFT_PAREN column_name_list RIGHT_PAREN)? query_expression - | INSERT (OVERWRITE)? INTO LOCATION path=Character_String_Literal (USING file_type=identifier (param_clause)?)? query_expression + | INSERT (OVERWRITE)? INTO LOCATION path=Character_String_Literal (USING storage_type=identifier (param_clause)?)? query_expression ; /* http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java new file mode 100644 index 0000000..5fed940 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.function.string; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.engine.function.annotation.Description; +import org.apache.tajo.engine.function.annotation.ParamTypes; +import org.apache.tajo.plan.function.GeneralFunction; +import org.apache.tajo.storage.Tuple; + +import java.text.DecimalFormat; + +@Description( + functionName = "to_char", + description = "convert integer to string.", + example = "> SELECT to_char(125, '00999');\n" + + "00125", + returnType = TajoDataTypes.Type.TEXT, + paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.INT8, TajoDataTypes.Type.TEXT})} +) + +public class ToCharLong extends GeneralFunction { + DecimalFormat df = null; + + public ToCharLong() { + super(new Column[]{new Column("val", TajoDataTypes.Type.INT8), new Column("format", TajoDataTypes.Type.TEXT)}); + } + + @Override + public Datum eval(Tuple params) { + if (df == null) { + df = new DecimalFormat(params.get(1).asChars()); + } + return new TextDatum(df.format(params.get(0).asInt8())); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java index 03b10c9..40e5f8a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java @@ -29,6 +29,7 @@ import org.apache.tajo.algebra.Aggregation.GroupType; import org.apache.tajo.algebra.LiteralValue.LiteralType; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.engine.parser.SQLParser.*; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.StringUtils; @@ -62,6 +63,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { try { context = parser.sql(); } catch (SQLParseError e) { + e.printStackTrace(); throw new SQLSyntaxError(e); } return visitSql(context); @@ -1162,12 +1164,14 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { createTable.setExternal(); ColumnDefinition[] elements = getDefinitions(ctx.table_elements()); - String fileType = ctx.file_type.getText(); - String path = stripQuote(ctx.path.getText()); - + String storageType = ctx.storage_type.getText(); createTable.setTableElements(elements); - createTable.setStorageType(fileType); - createTable.setLocation(path); + createTable.setStorageType(storageType); + + if (PlannerUtil.isFileStorageType(storageType)) { + String path = stripQuote(ctx.path.getText()); + createTable.setLocation(path); + } } else { if (checkIfExist(ctx.table_elements())) { ColumnDefinition[] elements = getDefinitions(ctx.table_elements()); @@ -1175,7 +1179,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { } if (checkIfExist(ctx.USING())) { - String fileType = ctx.file_type.getText(); + String fileType = ctx.storage_type.getText(); createTable.setStorageType(fileType); } @@ -1449,7 +1453,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { insertExpr.setLocation(stripQuote(ctx.path.getText())); if (ctx.USING() != null) { - insertExpr.setStorageType(ctx.file_type.getText()); + insertExpr.setStorageType(ctx.storage_type.getText()); if (ctx.param_clause() != null) { insertExpr.setParams(escapeTableMeta(getParams(ctx.param_clause()))); http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 5f47db7..98a621e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -29,11 +29,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.SessionVars; +import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.physical.*; @@ -250,11 +252,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { public long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) throws IOException { long size = 0; for (String tableId : tableIds) { - // TODO - CSV is a hack. - List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV, - ctx.getTables(tableId)); - for (FileFragment frag : fragments) { - size += frag.getLength(); + FragmentProto[] fragmentProtos = ctx.getTables(tableId); + List<Fragment> fragments = FragmentConvertor.convert(ctx.getConf(), fragmentProtos); + for (Fragment frag : fragments) { + size += StorageManager.getFragmentLength(ctx.getConf(), frag); } } return size; @@ -1182,7 +1183,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName()); List<FileFragment> fragments = - FragmentConvertor.convert(ctx.getConf(), ctx.getDataChannel().getStoreType(), fragmentProtos); + FragmentConvertor.convert(ctx.getConf(), fragmentProtos); String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys()); Path indexPath = new Path( http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java index 77eb32d..aecb364 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java @@ -82,6 +82,8 @@ public class ExecutionBlock { } else if (node instanceof TableSubQueryNode) { TableSubQueryNode subQuery = (TableSubQueryNode) node; s.add(s.size(), subQuery.getSubQuery()); + } else if (node instanceof StoreTableNode) { + store = (StoreTableNode)node; } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index 7818fd7..c5df5f9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -120,6 +120,7 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { super.init(); storeTablePath = context.getOutputPath(); + FileSystem fs = storeTablePath.getFileSystem(context.getConf()); if (!fs.exists(storeTablePath.getParent())) { fs.mkdirs(storeTablePath.getParent()); http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 6a69763..4e19114 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -173,7 +173,7 @@ public class ExternalSortExec extends SortExec { long chunkWriteStart = System.currentTimeMillis(); Path outputPath = getChunkPathForWrite(0, chunkId); - final RawFileAppender appender = new RawFileAppender(context.getConf(), inSchema, meta, outputPath); + final RawFileAppender appender = new RawFileAppender(context.getConf(), null, inSchema, meta, outputPath); appender.init(); for (Tuple t : tupleBlock) { appender.addTuple(t); @@ -471,7 +471,7 @@ public class ExternalSortExec extends SortExec { final Path outputPath = getChunkPathForWrite(level + 1, nextRunId); info(LOG, mergeFanout + " files are being merged to an output file " + outputPath.getName()); long mergeStartTime = System.currentTimeMillis(); - final RawFileAppender output = new RawFileAppender(context.getConf(), inSchema, meta, outputPath); + final RawFileAppender output = new RawFileAppender(context.getConf(), null, inSchema, meta, outputPath); output.init(); final Scanner merger = createKWayMerger(inputFiles, startIdx, mergeFanout); merger.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index a9ca836..568c6ec 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -27,6 +27,8 @@ import org.apache.hadoop.io.IOUtils; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.TextDatum; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.*; import org.apache.tajo.storage.index.bst.BSTIndex; http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index b863e4f..b2ef278 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -80,8 +80,7 @@ public class SeqScanExec extends PhysicalExec { String pathNameKey = ""; if (fragments != null) { for (FragmentProto f : fragments) { - Fragment fragement = FragmentConvertor.convert( - context.getConf(), plan.getTableDesc().getMeta().getStoreType(), f); + Fragment fragement = FragmentConvertor.convert(context.getConf(), f); pathNameKey += fragement.getKey(); } } @@ -216,8 +215,7 @@ public class SeqScanExec extends PhysicalExec { if (fragments != null) { if (fragments.length > 1) { this.scanner = new MergeScanner(context.getConf(), plan.getPhysicalSchema(), plan.getTableDesc().getMeta(), - FragmentConvertor.convert(context.getConf(), plan.getTableDesc().getMeta().getStoreType(), - fragments), projected + FragmentConvertor.convert(context.getConf(), fragments), projected ); } else { StorageManager storageManager = StorageManager.getStorageManager( http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index 725478f..a5e0b5d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -23,11 +23,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.plan.logical.InsertNode; import org.apache.tajo.plan.logical.PersistentStoreNode; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.Appender; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.Tuple; @@ -78,31 +80,32 @@ public class StoreTableExec extends UnaryPhysicalExec { } public void openNewFile(int suffixId) throws IOException { - String prevFile = null; + Schema appenderSchema = (plan instanceof InsertNode) ? ((InsertNode) plan).getTableSchema() : outSchema; - lastFileName = context.getOutputPath(); - if (suffixId > 0) { - prevFile = lastFileName.toString(); + if (PlannerUtil.isFileStorageType(meta.getStoreType())) { + String prevFile = null; - lastFileName = new Path(lastFileName + "_" + suffixId); - } + lastFileName = context.getOutputPath(); + + if (suffixId > 0) { + prevFile = lastFileName.toString(); + lastFileName = new Path(lastFileName + "_" + suffixId); + } + + appender = StorageManager.getFileStorageManager(context.getConf()).getAppender(meta, appenderSchema, lastFileName); - if (plan instanceof InsertNode) { - InsertNode createTableNode = (InsertNode) plan; - appender = StorageManager.getStorageManager(context.getConf(), meta.getStoreType()).getAppender(meta, - createTableNode.getTableSchema(), context.getOutputPath()); + if (suffixId > 0) { + LOG.info(prevFile + " exceeds " + SessionVars.MAX_OUTPUT_FILE_SIZE.keyname() + " (" + maxPerFileSize + " MB), " + + "The remain output will be written into " + lastFileName.toString()); + } } else { - appender = StorageManager.getStorageManager(context.getConf(), meta.getStoreType()).getAppender(meta, - outSchema, lastFileName); + appender = StorageManager.getStorageManager(context.getConf(), meta.getStoreType()).getAppender( + context.getQueryContext(), + context.getTaskId(), meta, appenderSchema, context.getQueryContext().getStagingDir()); } appender.enableStats(); appender.init(); - - if (suffixId > 0) { - LOG.info(prevFile + " exceeds " + SessionVars.MAX_OUTPUT_FILE_SIZE.keyname() + " (" + maxPerFileSize + " MB), " + - "The remain output will be written into " + lastFileName.toString()); - } } /* (non-Javadoc) http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java index d8f7f08..488cae5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java @@ -21,11 +21,11 @@ package org.apache.tajo.engine.query; import org.apache.hadoop.fs.Path; import org.apache.tajo.ConfigKey; import org.apache.tajo.OverridableConf; +import org.apache.tajo.QueryVars; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.session.Session; -import org.apache.tajo.validation.Validator; import org.apache.tajo.plan.logical.NodeType; import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto; @@ -34,41 +34,6 @@ import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetPro * QueryContent is a overridable config, and it provides a set of various configs for a query instance. */ public class QueryContext extends OverridableConf { - public static enum QueryVars implements ConfigKey { - COMMAND_TYPE, - STAGING_DIR, - OUTPUT_TABLE_NAME, - OUTPUT_TABLE_PATH, - OUTPUT_PARTITIONS, - OUTPUT_OVERWRITE, - OUTPUT_AS_DIRECTORY, - OUTPUT_PER_FILE_SIZE, - ; - - QueryVars() { - } - - @Override - public String keyname() { - return name().toLowerCase(); - } - - @Override - public ConfigType type() { - return ConfigType.QUERY; - } - - @Override - public Class<?> valueClass() { - return null; - } - - @Override - public Validator validator() { - return null; - } - } - public QueryContext(TajoConf conf) { super(conf, ConfigKey.ConfigType.QUERY); } @@ -103,8 +68,8 @@ public class QueryContext extends OverridableConf { } public Path getStagingDir() { - String strVal = get(QueryVars.STAGING_DIR); - return strVal != null ? new Path(strVal) : null; + String strVal = get(QueryVars.STAGING_DIR, ""); + return strVal != null && !strVal.isEmpty() ? new Path(strVal) : null; } /** @@ -127,7 +92,9 @@ public class QueryContext extends OverridableConf { } public void setOutputPath(Path path) { - put(QueryVars.OUTPUT_TABLE_PATH, path.toUri().toString()); + if (path != null) { + put(QueryVars.OUTPUT_TABLE_PATH, path.toUri().toString()); + } } public Path getOutputPath() { http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java index aeb4e05..3bb1b5b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java @@ -125,12 +125,13 @@ public class TupleUtil { Tuple startTuple = new VTuple(target.size()); Tuple endTuple = new VTuple(target.size()); int i = 0; + int sortSpecIndex = 0; // In outer join, empty table could be searched. // As a result, min value and max value would be null. // So, we should put NullDatum for this case. for (Column col : target.getColumns()) { - if (sortSpecs[i].isAscending()) { + if (sortSpecs[sortSpecIndex].isAscending()) { if (statSet.get(col).getMinValue() != null) startTuple.put(i, statSet.get(col).getMinValue()); else @@ -164,6 +165,10 @@ public class TupleUtil { else endTuple.put(i, DatumFactory.createNullDatum()); } + if (target.getColumns().size() == sortSpecs.length) { + // Not composite column sort + sortSpecIndex++; + } i++; } return new TupleRange(sortSpecs, startTuple, endTuple); http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java index 6a2a705..406550d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java @@ -87,7 +87,7 @@ public class DefaultFragmentScheduleAlgorithm implements FragmentScheduleAlgorit diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds(); } for (int i = 0; i < hosts.length; i++) { - addFragment(hosts[i], diskIds[i], fragmentPair); + addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair); } fragmentNum++; } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 90d8dc7..d87ca30 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -303,6 +303,15 @@ public class GlobalEngine extends AbstractService { responseBuilder.setResultCode(ClientProtos.ResultCode.OK); } } else { // it requires distributed execution. So, the query is forwarded to a query master. + StoreType storeType = PlannerUtil.getStoreType(plan); + if (storeType != null) { + StorageManager sm = StorageManager.getStorageManager(context.getConf(), storeType); + StorageProperty storageProperty = sm.getStorageProperty(); + if (!storageProperty.isSupportsInsertInto()) { + throw new VerifyException("Inserting into non-file storage is not supported."); + } + sm.beforeInsertOrCATS(rootNode.getChild()); + } context.getSystemMetrics().counter("Query", "numDMLQuery").inc(); hookManager.doHooks(queryContext, plan); @@ -519,6 +528,7 @@ public class GlobalEngine extends AbstractService { LOG.info("============================================="); annotatedPlanVerifier.verify(queryContext, state, plan); + verifyInsertTableSchema(queryContext, state, plan); if (!state.verified()) { StringBuilder sb = new StringBuilder(); @@ -531,6 +541,25 @@ public class GlobalEngine extends AbstractService { return plan; } + private void verifyInsertTableSchema(QueryContext queryContext, VerificationState state, LogicalPlan plan) { + StoreType storeType = PlannerUtil.getStoreType(plan); + if (storeType != null) { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + if (rootNode.getChild().getType() == NodeType.INSERT) { + try { + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); + InsertNode iNode = rootNode.getChild(); + Schema outSchema = iNode.getChild().getOutSchema(); + + StorageManager.getStorageManager(queryContext.getConf(), storeType) + .verifyInsertTableSchema(tableDesc, outSchema); + } catch (Throwable t) { + state.addVerification(t.getMessage()); + } + } + } + } + /** * Alter a given table */ @@ -693,7 +722,7 @@ public class GlobalEngine extends AbstractService { meta = CatalogUtil.newTableMeta(createTable.getStorageType()); } - if(StorageUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){ + if(PlannerUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){ Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given."); } @@ -735,7 +764,7 @@ public class GlobalEngine extends AbstractService { desc.setPartitionMethod(partitionDesc); } - StorageManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc); + StorageManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc, ifNotExists); if (catalog.createTable(desc)) { LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")"); http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java index 3798399..56cf8e5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java @@ -107,7 +107,7 @@ public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorith diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds(); } for (int i = 0; i < hosts.length; i++) { - addFragment(hosts[i], diskIds[i], fragmentPair); + addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair); } totalFragmentNum++; } @@ -285,21 +285,22 @@ public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorith diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds(); } for (int i = 0; i < hosts.length; i++) { + int diskId = diskIds == null ? -1 : diskIds[i]; String normalizedHost = NetUtils.normalizeHost(hosts[i]); Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost); if (diskFragmentMap != null) { - FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskIds[i]); + FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskId); if (fragmentsPerDisk != null) { boolean isRemoved = fragmentsPerDisk.removeFragmentPair(fragmentPair); if (isRemoved) { if (fragmentsPerDisk.size() == 0) { - diskFragmentMap.remove(diskIds[i]); + diskFragmentMap.remove(diskId); if (diskFragmentMap.size() == 0) { fragmentHostMapping.remove(normalizedHost); } } - HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskIds[i]); + HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskId); if (totalHostPriority.containsKey(hostAndDisk)) { PrioritizedHost prioritizedHost = totalHostPriority.get(hostAndDisk); updateHostPriority(prioritizedHost.hostAndDisk, prioritizedHost.priority-1); http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java index 50a118e..aff4b7d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java @@ -26,7 +26,9 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryUnitRequest; @@ -38,6 +40,7 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.master.querymaster.QueryUnit; import org.apache.tajo.master.querymaster.QueryUnitAttempt; import org.apache.tajo.master.querymaster.SubQuery; +import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.NetUtils; @@ -368,6 +371,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { long taskSize = adjustTaskSize(); LOG.info("Adjusted task size: " + taskSize); + TajoConf conf = subQuery.getContext().getConf(); // host local, disk local String normalized = NetUtils.normalizeHost(host); Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID); @@ -378,13 +382,14 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { break; } - if (assignedFragmentSize + fragmentPair.getLeftFragment().getLength() > taskSize) { + if (assignedFragmentSize + + StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) { break; } else { fragmentPairs.add(fragmentPair); - assignedFragmentSize += fragmentPair.getLeftFragment().getLength(); + assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()); if (fragmentPair.getRightFragment() != null) { - assignedFragmentSize += fragmentPair.getRightFragment().getLength(); + assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment()); } } scheduledFragments.removeFragment(fragmentPair); @@ -400,13 +405,14 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { break; } - if (assignedFragmentSize + fragmentPair.getLeftFragment().getLength() > taskSize) { + if (assignedFragmentSize + + StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) { break; } else { fragmentPairs.add(fragmentPair); - assignedFragmentSize += fragmentPair.getLeftFragment().getLength(); + assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()); if (fragmentPair.getRightFragment() != null) { - assignedFragmentSize += fragmentPair.getRightFragment().getLength(); + assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment()); } } scheduledFragments.removeFragment(fragmentPair); http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java index 26cfb2e..37fa4fe 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java @@ -35,6 +35,9 @@ import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -42,7 +45,7 @@ import java.util.ArrayList; import java.util.List; public class NonForwardQueryResultScanner { - private static final int MAX_FILE_NUM_PER_SCAN = 100; + private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100; private QueryId queryId; private String sessionId; @@ -55,7 +58,7 @@ public class NonForwardQueryResultScanner { private TajoConf tajoConf; private ScanNode scanNode; - private int currentFileIndex = 0; + private int currentFragmentIndex = 0; public NonForwardQueryResultScanner(TajoConf tajoConf, String sessionId, QueryId queryId, @@ -77,22 +80,24 @@ public class NonForwardQueryResultScanner { } private void initSeqScanExec() throws IOException { - FragmentProto[] fragments = PhysicalPlanUtil.getNonZeroLengthDataFiles(tajoConf, tableDesc, - currentFileIndex, MAX_FILE_NUM_PER_SCAN); - if (fragments != null && fragments.length > 0) { + List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType()) + .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); + + if (fragments != null && !fragments.isEmpty()) { + FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[]{})); this.taskContext = new TaskAttemptContext( new QueryContext(tajoConf), null, new QueryUnitAttemptId(new QueryUnitId(new ExecutionBlockId(queryId, 1), 0), 0), - fragments, null); + fragmentProtos, null); try { // scanNode must be clone cause SeqScanExec change target in the case of a partitioned table. - scanExec = new SeqScanExec(taskContext, (ScanNode)scanNode.clone(), fragments); + scanExec = new SeqScanExec(taskContext, (ScanNode)scanNode.clone(), fragmentProtos); } catch (CloneNotSupportedException e) { throw new IOException(e.getMessage(), e); } scanExec.init(); - currentFileIndex += fragments.length; + currentFragmentIndex += fragments.size(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index caeadea..218798b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -23,7 +23,6 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.event.EventHandler; @@ -32,31 +31,27 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; import org.apache.tajo.SessionVars; -import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.plan.logical.CreateTableNode; -import org.apache.tajo.plan.logical.InsertNode; -import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.*; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.event.*; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageConstants; -import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.QueryHistory; import org.apache.tajo.util.history.SubQueryHistory; import java.io.IOException; -import java.text.NumberFormat; import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -410,8 +405,12 @@ public class Query implements EventHandler<QueryEvent> { QueryCompletedEvent subQueryEvent = (QueryCompletedEvent) queryEvent; QueryState finalState; if (subQueryEvent.getState() == SubQueryState.SUCCEEDED) { - finalizeQuery(query, subQueryEvent); - finalState = QueryState.QUERY_SUCCEEDED; + boolean success = finalizeQuery(query, subQueryEvent); + if (success) { + finalState = QueryState.QUERY_SUCCEEDED; + } else { + finalState = QueryState.QUERY_ERROR; + } } else if (subQueryEvent.getState() == SubQueryState.FAILED) { finalState = QueryState.QUERY_FAILED; } else if (subQueryEvent.getState() == SubQueryState.KILLED) { @@ -419,324 +418,48 @@ public class Query implements EventHandler<QueryEvent> { } else { finalState = QueryState.QUERY_ERROR; } + if (finalState != QueryState.QUERY_SUCCEEDED) { + SubQuery lastStage = query.getSubQuery(subQueryEvent.getExecutionBlockId()); + if (lastStage != null && lastStage.getTableMeta() != null) { + StoreType storeType = lastStage.getTableMeta().getStoreType(); + if (storeType != null) { + LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); + try { + StorageManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild()); + } catch (IOException e) { + LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e); + } + } + } + } query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId())); query.setFinishTime(); return finalState; } - private void finalizeQuery(Query query, QueryCompletedEvent event) { - MasterPlan masterPlan = query.getPlan(); + private boolean finalizeQuery(Query query, QueryCompletedEvent event) { + SubQuery lastStage = query.getSubQuery(event.getExecutionBlockId()); + StoreType storeType = lastStage.getTableMeta().getStoreType(); + try { + LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); + CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); - ExecutionBlock terminal = query.getPlan().getTerminalBlock(); - DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId()); - Path finalOutputDir = commitOutputData(query); + Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType) + .commitOutputData(query.context.getQueryContext(), + lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc); - QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); - try { - hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), - finalOutputDir); + QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); + hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir); + return true; } catch (Exception e) { + LOG.error(e.getMessage(), e); query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); + return false; } } - /** - * It moves a result data stored in a staging output dir into a final output dir. - */ - public Path commitOutputData(Query query) { - QueryContext queryContext = query.context.getQueryContext(); - Path stagingResultDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME); - Path finalOutputDir; - if (queryContext.hasOutputPath()) { - finalOutputDir = queryContext.getOutputPath(); - try { - FileSystem fs = stagingResultDir.getFileSystem(query.systemConf); - - if (queryContext.isOutputOverwrite()) { // INSERT OVERWRITE INTO - - // It moves the original table into the temporary location. - // Then it moves the new result table into the original table location. - // Upon failed, it recovers the original table if possible. - boolean movedToOldTable = false; - boolean committed = false; - Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); - - if (queryContext.hasPartition()) { - // This is a map for existing non-leaf directory to rename. A key is current directory and a value is - // renaming directory. - Map<Path, Path> renameDirs = TUtil.newHashMap(); - // This is a map for recovering existing partition directory. A key is current directory and a value is - // temporary directory to back up. - Map<Path, Path> recoveryDirs = TUtil.newHashMap(); - - try { - if (!fs.exists(finalOutputDir)) { - fs.mkdirs(finalOutputDir); - } - - visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), - renameDirs, oldTableDir); - - // Rename target partition directories - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - // Backup existing data files for recovering - if (fs.exists(entry.getValue())) { - String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), - oldTableDir.toString()); - Path recoveryPath = new Path(recoveryPathString); - fs.rename(entry.getValue(), recoveryPath); - fs.exists(recoveryPath); - recoveryDirs.put(entry.getValue(), recoveryPath); - } - // Delete existing directory - fs.delete(entry.getValue(), true); - // Rename staging directory to final output directory - fs.rename(entry.getKey(), entry.getValue()); - } - - } catch (IOException ioe) { - // Remove created dirs - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - fs.delete(entry.getValue(), true); - } - - // Recovery renamed dirs - for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { - fs.delete(entry.getValue(), true); - fs.rename(entry.getValue(), entry.getKey()); - } - throw new IOException(ioe.getMessage()); - } - } else { - try { - if (fs.exists(finalOutputDir)) { - fs.rename(finalOutputDir, oldTableDir); - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir.getParent()); - } - - fs.rename(stagingResultDir, finalOutputDir); - committed = fs.exists(finalOutputDir); - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { - fs.rename(oldTableDir, finalOutputDir); - } - } - } - } else { - NodeType queryType = queryContext.getCommandType(); - - if (queryType == NodeType.INSERT) { // INSERT INTO an existing table - - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); - - if (queryContext.hasPartition()) { - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.isFile()) { - LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1); - } - } else { - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++); - } - } - // checking all file moved and remove empty dir - verifyAllFileMoved(fs, stagingResultDir); - FileStatus[] files = fs.listStatus(stagingResultDir); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - } - } - } else { // CREATE TABLE AS SELECT (CTAS) - fs.rename(stagingResultDir, finalOutputDir); - LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); - } - } - } catch (IOException e) { - // TODO report to client - e.printStackTrace(); - } - } else { - finalOutputDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME); - } - - return finalOutputDir; - } - - /** - * This method sets a rename map which includes renamed staging directory to final output directory recursively. - * If there exists some data files, this delete it for duplicate data. - * - * - * @param fs - * @param stagingPath - * @param outputPath - * @param stagingParentPathString - * @throws IOException - */ - private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, - String stagingParentPathString, - Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - - for(FileStatus eachFile : files) { - if (eachFile.isDirectory()) { - Path oldPath = eachFile.getPath(); - - // Make recover directory. - String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, - oldTableDir.toString()); - Path recoveryPath = new Path(recoverPathString); - if (!fs.exists(recoveryPath)) { - fs.mkdirs(recoveryPath); - } - - visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, - renameDirs, oldTableDir); - // Find last order partition for renaming - String newPathString = oldPath.toString().replaceAll(stagingParentPathString, - outputPath.toString()); - Path newPath = new Path(newPathString); - if (!isLeafDirectory(fs, eachFile.getPath())) { - renameDirs.put(eachFile.getPath(), newPath); - } else { - if (!fs.exists(newPath)) { - fs.mkdirs(newPath); - } - } - } - } - } - - private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { - boolean retValue = false; - - FileStatus[] files = fs.listStatus(path); - for (FileStatus file : files) { - if (fs.isDirectory(file.getPath())) { - retValue = true; - break; - } - } - - return retValue; - } - - private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - if (eachFile.isFile()) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - return false; - } else { - if (verifyAllFileMoved(fs, eachFile.getPath())) { - fs.delete(eachFile.getPath(), false); - } else { - return false; - } - } - } - } - - return true; - } - - /** - * Attach the sequence number to a path. - * - * @param path Path - * @param seq sequence number - * @param nf Number format - * @return New path attached with sequence number - * @throws IOException - */ - private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { - String[] tokens = path.getName().split("-"); - if (tokens.length != 4) { - throw new IOException("Wrong result file name:" + path); - } - return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); - } - - /** - * Attach the sequence number to the output file name and than move the file into the final result path. - * - * @param fs FileSystem - * @param stagingResultDir The staging result dir - * @param fileStatus The file status - * @param finalOutputPath Final output path - * @param nf Number format - * @param fileSeq The sequence number - * @throws IOException - */ - private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, - FileStatus fileStatus, Path finalOutputPath, - NumberFormat nf, - int fileSeq) throws IOException { - if (fileStatus.isDirectory()) { - String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); - if (subPath != null) { - Path finalSubPath = new Path(finalOutputPath, subPath); - if (!fs.exists(finalSubPath)) { - fs.mkdirs(finalSubPath); - } - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); - for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq); - } - } else { - throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); - } - } else { - String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); - if (subPath != null) { - Path finalSubPath = new Path(finalOutputPath, subPath); - finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); - if (!fs.exists(finalSubPath.getParent())) { - fs.mkdirs(finalSubPath.getParent()); - } - if (fs.exists(finalSubPath)) { - throw new IOException("Already exists data file:" + finalSubPath); - } - boolean success = fs.rename(fileStatus.getPath(), finalSubPath); - if (success) { - LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); - } else { - LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); - } - } - } - } - - private String extractSubPath(Path parentPath, Path childPath) { - String parentPathStr = parentPath.toUri().getPath(); - String childPathStr = childPath.toUri().getPath(); - - if (parentPathStr.length() > childPathStr.length()) { - return null; - } - - int index = childPathStr.indexOf(parentPathStr); - if (index != 0) { - return null; - } - - return childPathStr.substring(parentPathStr.length() + 1); - } - private static interface QueryHook { boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir); void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index ef30135..6a55de6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -35,10 +35,13 @@ import org.apache.tajo.algebra.Expr; import org.apache.tajo.algebra.JsonHelper; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.LogicalRootNode; +import org.apache.tajo.plan.rewrite.RewriteRule; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.plan.logical.LogicalNode; @@ -54,10 +57,12 @@ import org.apache.tajo.master.TajoContainerProxy; import org.apache.tajo.master.event.*; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.master.session.Session; +import org.apache.tajo.plan.verifier.VerifyException; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.StorageProperty; import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.metrics.TajoMetrics; import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter; @@ -345,6 +350,8 @@ public class QueryMasterTask extends CompositeService { } public synchronized void startQuery() { + StorageManager sm = null; + LogicalPlan plan = null; try { if (query != null) { LOG.warn("Query already started"); @@ -354,7 +361,29 @@ public class QueryMasterTask extends CompositeService { LogicalPlanner planner = new LogicalPlanner(catalog); LogicalOptimizer optimizer = new LogicalOptimizer(systemConf); Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class); - LogicalPlan plan = planner.createPlan(queryContext, expr); + plan = planner.createPlan(queryContext, expr); + + StoreType storeType = PlannerUtil.getStoreType(plan); + if (storeType != null) { + sm = StorageManager.getStorageManager(systemConf, storeType); + StorageProperty storageProperty = sm.getStorageProperty(); + if (storageProperty.isSortedInsert()) { + String tableName = PlannerUtil.getStoreTableName(plan); + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); + if (tableDesc == null) { + throw new VerifyException("Can't get table meta data from catalog: " + tableName); + } + List<RewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules( + getQueryTaskContext().getQueryContext(), tableDesc); + if (storageSpecifiedRewriteRules != null) { + for (RewriteRule eachRule: storageSpecifiedRewriteRules) { + optimizer.addRuleAfterToJoinOpt(eachRule); + } + } + } + } + optimizer.optimize(queryContext, plan); GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager(); @@ -389,6 +418,15 @@ public class QueryMasterTask extends CompositeService { } catch (Throwable t) { LOG.error(t.getMessage(), t); initError = t; + + if (plan != null && sm != null) { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + try { + sm.rollbackOutputCommit(rootNode.getChild()); + } catch (IOException e) { + LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e); + } + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java index 6bc185e..75402c2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java @@ -266,8 +266,13 @@ public class QueryUnit implements EventHandler<TaskEvent> { List<String> fragmentList = new ArrayList<String>(); for (FragmentProto eachFragment : getAllFragments()) { - FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment); - fragmentList.add(fileFragment.toString()); + try { + Fragment fragment = FragmentConvertor.convert(systemConf, eachFragment); + fragmentList.add(fragment.toString()); + } catch (Exception e) { + LOG.error(e.getMessage()); + fragmentList.add("ERROR: " + eachFragment.getStoreType() + "," + eachFragment.getId() + ": " + e.getMessage()); + } } queryUnitHistory.setFragments(fragmentList.toArray(new String[]{})); @@ -321,7 +326,7 @@ public class QueryUnit implements EventHandler<TaskEvent> { diskIds = ((FileFragment)fragment).getDiskIds(); } for (int i = 0; i < hosts.length; i++) { - dataLocations.add(new DataLocation(hosts[i], diskIds[i])); + dataLocations.add(new DataLocation(hosts[i], diskIds == null ? -1 : diskIds[i])); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 046b310..c5a41a1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -45,6 +45,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAg import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; import org.apache.tajo.master.TaskSchedulerContext; import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; +import org.apache.tajo.plan.logical.SortNode.SortPurpose; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.logical.*; @@ -419,7 +420,8 @@ public class Repartitioner { } else { StorageManager storageManager = StorageManager.getStorageManager(subQuery.getContext().getConf(), tableDesc.getMeta().getStoreType()); - Collection<Fragment> scanFragments = storageManager.getSplits(eachScan.getCanonicalName(), tableDesc); + Collection<Fragment> scanFragments = storageManager.getSplits(eachScan.getCanonicalName(), + tableDesc, eachScan); if (scanFragments != null) { rightFragments.addAll(scanFragments); } @@ -539,7 +541,7 @@ public class Repartitioner { StorageManager storageManager = StorageManager.getStorageManager(subQuery.getContext().getConf(), desc.getMeta().getStoreType()); - scanFragments = storageManager.getSplits(scan.getCanonicalName(), desc); + scanFragments = storageManager.getSplits(scan.getCanonicalName(), desc, scan); } if (scanFragments != null) { @@ -649,39 +651,58 @@ public class Repartitioner { SortSpec [] sortSpecs = sortNode.getSortKeys(); Schema sortSchema = new Schema(channel.getShuffleKeys()); + TupleRange[] ranges; + int determinedTaskNum; + // calculate the number of maximum query ranges TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId()); // If there is an empty table in inner join, it should return zero rows. - if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0 ) { + if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0) { return; } TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), false); - RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs); - BigInteger card = partitioner.getTotalCardinality(); - // if the number of the range cardinality is less than the desired number of tasks, - // we set the the number of tasks to the number of range cardinality. - int determinedTaskNum; - if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) { - LOG.info(subQuery.getId() + ", The range cardinality (" + card - + ") is less then the desired number of tasks (" + maxNum + ")"); - determinedTaskNum = card.intValue(); + if (sortNode.getSortPurpose() == SortPurpose.STORAGE_SPECIFIED) { + StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan()); + CatalogService catalog = subQuery.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + LogicalRootNode rootNode = masterPlan.getLogicalPlan().getRootBlock().getRoot(); + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); + if (tableDesc == null) { + throw new IOException("Can't get table meta data from catalog: " + + PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan())); + } + ranges = StorageManager.getStorageManager(subQuery.getContext().getConf(), storeType) + .getInsertSortRanges(subQuery.getContext().getQueryContext(), tableDesc, + sortNode.getInSchema(), sortSpecs, + mergedRange); + determinedTaskNum = ranges.length; } else { - determinedTaskNum = maxNum; - } + RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs); + BigInteger card = partitioner.getTotalCardinality(); + + // if the number of the range cardinality is less than the desired number of tasks, + // we set the the number of tasks to the number of range cardinality. + if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) { + LOG.info(subQuery.getId() + ", The range cardinality (" + card + + ") is less then the desired number of tasks (" + maxNum + ")"); + determinedTaskNum = card.intValue(); + } else { + determinedTaskNum = maxNum; + } - LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum + - " sub ranges (total units: " + determinedTaskNum + ")"); - TupleRange [] ranges = partitioner.partition(determinedTaskNum); - if (ranges == null || ranges.length == 0) { - LOG.warn(subQuery.getId() + " no range infos."); - } - TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges); - if (LOG.isDebugEnabled()) { - if (ranges != null) { - for (TupleRange eachRange : ranges) { - LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); + LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum + + " sub ranges (total units: " + determinedTaskNum + ")"); + ranges = partitioner.partition(determinedTaskNum); + if (ranges == null || ranges.length == 0) { + LOG.warn(subQuery.getId() + " no range infos."); + } + TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges); + if (LOG.isDebugEnabled()) { + if (ranges != null) { + for (TupleRange eachRange : ranges) { + LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); + } } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 18d4c28..6676072 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -23,7 +23,6 @@ import com.google.common.collect.Lists; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; @@ -37,7 +36,7 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; @@ -62,7 +61,6 @@ import org.apache.tajo.storage.FileStorageManager; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.logical.*; import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; @@ -677,14 +675,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0); - // get default or store type - CatalogProtos.StoreType storeType = CatalogProtos.StoreType.CSV; // default setting // if store plan (i.e., CREATE or INSERT OVERWRITE) - StoreTableNode storeTableNode = PlannerUtil.findTopNode(getBlock().getPlan(), NodeType.STORE); - if (storeTableNode != null) { - storeType = storeTableNode.getStorageType(); + StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan()); + if (storeType == null) { + // get default or store type + storeType = StoreType.CSV; } + schema = channel.getSchema(); meta = CatalogUtil.newTableMeta(storeType, new KeyValueSet()); inputStatistics = statsArray[0]; @@ -1058,7 +1056,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } else { StorageManager storageManager = StorageManager.getStorageManager(subQuery.getContext().getConf(), meta.getStoreType()); - fragments = storageManager.getSplits(scan.getCanonicalName(), table); + fragments = storageManager.getSplits(scan.getCanonicalName(), table, scan); } SubQuery.scheduleFragments(subQuery, fragments);
