TAJO-1346: Create dynamic partitions to CatalogStore by running insert query or CTAS query. (jaehwa)
Closes #630 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d80c32b2 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d80c32b2 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d80c32b2 Branch: refs/heads/master Commit: d80c32b28738a69c3a512108e65ed5c7a3e3adc6 Parents: 00ccb8b Author: JaeHwa Jung <[email protected]> Authored: Fri Jul 31 12:04:02 2015 +0900 Committer: JaeHwa Jung <[email protected]> Committed: Fri Jul 31 12:04:02 2015 +0900 ---------------------------------------------------------------------- .../org/apache/tajo/algebra/AlterTable.java | 29 +- .../tajo/catalog/AbstractCatalogClient.java | 25 ++ .../src/main/proto/CatalogProtocol.proto | 1 + .../org/apache/tajo/catalog/CatalogService.java | 3 + ...xistsAssumedPartitionDirectoryException.java | 28 -- ...biguousPartitionDirectoryExistException.java | 30 ++ .../exception/NoPartitionedTableException.java | 26 -- .../exception/NoSuchPartitionException.java | 37 --- .../exception/NoSuchPartitionKeyException.java | 29 -- .../UndefinedPartitionKeyException.java | 30 ++ .../UndefinedPartitionMethodException.java | 30 ++ .../src/main/proto/CatalogProtos.proto | 12 +- .../tajo/catalog/store/HiveCatalogStore.java | 125 +++++++- .../org/apache/tajo/catalog/CatalogServer.java | 46 ++- .../tajo/catalog/store/AbstractDBStore.java | 284 ++++++++++++------- .../apache/tajo/catalog/store/CatalogStore.java | 3 + .../org/apache/tajo/catalog/store/MemStore.java | 76 +++-- .../src/main/resources/schemas/derby/derby.xml | 9 +- .../main/resources/schemas/mariadb/mariadb.xml | 10 +- .../src/main/resources/schemas/mysql/mysql.xml | 10 +- .../main/resources/schemas/oracle/oracle.xml | 11 +- .../resources/schemas/postgresql/postgresql.xml | 16 +- .../org/apache/tajo/catalog/TestCatalog.java | 5 +- .../java/org/apache/tajo/conf/TajoConf.java | 4 + .../apache/tajo/exception/ErrorMessages.java | 5 + tajo-common/src/main/proto/errors.proto | 2 + .../org/apache/tajo/engine/parser/SQLParser.g4 | 4 +- .../apache/tajo/engine/parser/SQLAnalyzer.java | 2 + .../planner/physical/ColPartitionStoreExec.java | 44 ++- .../apache/tajo/master/exec/DDLExecutor.java | 81 +++--- .../java/org/apache/tajo/querymaster/Query.java | 42 ++- .../java/org/apache/tajo/querymaster/Stage.java | 15 + .../apache/tajo/querymaster/TaskAttempt.java | 18 ++ .../apache/tajo/worker/TaskAttemptContext.java | 19 ++ .../java/org/apache/tajo/worker/TaskImpl.java | 4 + tajo-core/src/main/proto/ResourceProtos.proto | 1 + .../tajo/engine/parser/TestSQLAnalyzer.java | 38 +++ .../tajo/engine/query/TestAlterTable.java | 3 +- .../tajo/engine/query/TestTablePartitions.java | 130 ++++++++- .../alter_table_add_partition2.sql | 1 + .../alter_table_drop_partition2.sql | 1 + .../default/alter_table_add_partition_5.sql | 1 + .../default/alter_table_drop_partition_4.sql | 1 + .../testAlterTableAddDropPartition.result | 2 +- .../src/main/conf/catalog-site.xml.template | 4 +- .../main/sphinx/sql_language/alter_table.rst | 7 +- .../org/apache/tajo/plan/LogicalPlanner.java | 2 + .../tajo/plan/logical/AlterTableNode.java | 22 +- .../plan/serder/LogicalNodeDeserializer.java | 3 + tajo-plan/src/main/proto/Plan.proto | 2 + 50 files changed, 1002 insertions(+), 331 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTable.java ---------------------------------------------------------------------- diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTable.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTable.java index 260025f..f0bd62a 100644 --- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTable.java +++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTable.java @@ -54,6 +54,11 @@ public class AlterTable extends Expr { @Expose @SerializedName("IsPurge") private boolean purge; + @Expose @SerializedName("IfNotExists") + private boolean ifNotExists; + @Expose @SerializedName("IfExists") + private boolean ifExists; + public AlterTable(final String tableName) { super(OpType.AlterTable); this.tableName = tableName; @@ -140,10 +145,26 @@ public class AlterTable extends Expr { this.purge = purge; } + public boolean isIfNotExists() { + return ifNotExists; + } + + public void setIfNotExists(boolean ifNotExists) { + this.ifNotExists = ifNotExists; + } + + public boolean isIfExists() { + return ifExists; + } + + public void setIfExists(boolean ifExists) { + this.ifExists = ifExists; + } + @Override public int hashCode() { return Objects.hashCode(tableName, newTableName, columnName, newColumnName, addNewColumn, alterTableOpType, - columns, values, location, params, purge + columns, values, location, params, purge, ifNotExists, ifExists ); } @@ -161,7 +182,9 @@ public class AlterTable extends Expr { TUtil.checkEquals(values, another.values) && TUtil.checkEquals(location, another.location) && TUtil.checkEquals(params, another.params) && - TUtil.checkEquals(purge, another.purge) + TUtil.checkEquals(purge, another.purge) && + TUtil.checkEquals(ifNotExists, another.ifNotExists) && + TUtil.checkEquals(ifExists, another.ifExists) ; } @@ -183,6 +206,8 @@ public class AlterTable extends Expr { alter.params = new HashMap<String, String>(params); } alter.purge = purge; + alter.ifNotExists = ifNotExists; + alter.ifExists = ifExists; return alter; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index 402df0f..52f4b8e 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -416,6 +416,31 @@ public abstract class AbstractCatalogClient implements CatalogService, Closeable } @Override + public boolean addPartitions(String databaseName, String tableName, List<PartitionDescProto> partitions + , boolean ifNotExists) { + try { + final BlockingInterface stub = getStub(); + final AddPartitionsProto.Builder builder = AddPartitionsProto.newBuilder(); + + TableIdentifierProto.Builder identifier = TableIdentifierProto.newBuilder(); + identifier.setDatabaseName(databaseName); + identifier.setTableName(tableName); + builder.setTableIdentifier(identifier.build()); + + for (PartitionDescProto partition: partitions) { + builder.addPartitionDesc(partition); + } + + builder.setIfNotExists(ifNotExists); + + return isSuccess(stub.addPartitions(null, builder.build())); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); + return false; + } + } + + @Override public final Collection<String> getAllTableNames(final String databaseName) { try { final BlockingInterface stub = getStub(); http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto index ee74aa0..39201e6 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto +++ b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto @@ -124,6 +124,7 @@ service CatalogProtocolService { rpc getPartitionByPartitionName(PartitionIdentifierProto) returns (GetPartitionDescResponse); rpc getPartitionsByTableName(PartitionIdentifierProto) returns (GetPartitionsResponse); rpc getAllPartitions(NullProto) returns (GetTablePartitionsResponse); + rpc addPartitions(AddPartitionsProto) returns (ReturnState); rpc createIndex(IndexDescProto) returns (ReturnState); rpc dropIndex(IndexNameProto) returns (ReturnState); http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java index 7704191..26fc564 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java @@ -183,6 +183,9 @@ public interface CatalogService { List<TablePartitionProto> getAllPartitions(); + boolean addPartitions(String databaseName, String tableName, List<PartitionDescProto> partitions + , boolean ifNotExists); + boolean createIndex(IndexDesc index); boolean existIndexByName(String databaseName, String indexName); http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsAssumedPartitionDirectoryException.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsAssumedPartitionDirectoryException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsAssumedPartitionDirectoryException.java deleted file mode 100644 index df13f82..0000000 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsAssumedPartitionDirectoryException.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.catalog.exception; - -public class AlreadyExistsAssumedPartitionDirectoryException extends RuntimeException { - - private static final long serialVersionUID = 277182608283894931L; - - public AlreadyExistsAssumedPartitionDirectoryException(String message) { - super(String.format("ERROR: There is a directory which is assumed to be a partitioned directory : %s", message)); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AmbiguousPartitionDirectoryExistException.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AmbiguousPartitionDirectoryExistException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AmbiguousPartitionDirectoryExistException.java new file mode 100644 index 0000000..0c99a4f --- /dev/null +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AmbiguousPartitionDirectoryExistException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.exception; + + +import org.apache.tajo.error.Errors.ResultCode; + +public class AmbiguousPartitionDirectoryExistException extends CatalogException { + private static final long serialVersionUID = 277182608283894931L; + + public AmbiguousPartitionDirectoryExistException(String columnName) { + super(ResultCode.AMBIGUOUS_PARTITION_DIRECTORY, columnName); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoPartitionedTableException.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoPartitionedTableException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoPartitionedTableException.java deleted file mode 100644 index e81f526..0000000 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoPartitionedTableException.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.catalog.exception; - -public class NoPartitionedTableException extends Exception { - - public NoPartitionedTableException(String databaseName, String relName) { - super(String.format("ERROR: table \"%s.%s\" is not a partitioned table", databaseName, relName)); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java deleted file mode 100644 index 70e0d26..0000000 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.catalog.exception; - -public class NoSuchPartitionException extends RuntimeException { - - private static final long serialVersionUID = 277182608283894938L; - - public NoSuchPartitionException(String message) { - super(message); - } - - public NoSuchPartitionException(String tableName, String partitionName) { - super(String.format("ERROR: \"%s\" is not the partition of \"%s\".", partitionName, tableName)); - } - - public NoSuchPartitionException(String databaseName, String tableName, String partitionName) { - super(String.format("ERROR: \"%s\" is not the partition of \"%s.%s\".", partitionName, databaseName, tableName)); - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionKeyException.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionKeyException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionKeyException.java deleted file mode 100644 index 94574dc..0000000 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionKeyException.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.catalog.exception; - -public class NoSuchPartitionKeyException extends RuntimeException { - - private static final long serialVersionUID = 277182608283894939L; - - public NoSuchPartitionKeyException(String tableName, String partitionKey) { - super(String.format("ERROR: \"%s\" column is not the partition key of \"%s\".", - partitionKey, tableName)); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionKeyException.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionKeyException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionKeyException.java new file mode 100644 index 0000000..5e6d20f --- /dev/null +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionKeyException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.exception; + +import org.apache.tajo.error.Errors.ResultCode; + +public class UndefinedPartitionKeyException extends CatalogException { + + private static final long serialVersionUID = 277182608283894939L; + + public UndefinedPartitionKeyException(String partitionKey) { + super(ResultCode.UNDEFINED_PARTITION_KEY, partitionKey); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionMethodException.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionMethodException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionMethodException.java new file mode 100644 index 0000000..5b6eb04 --- /dev/null +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionMethodException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.exception; + +import org.apache.tajo.error.Errors.ResultCode; + +public class UndefinedPartitionMethodException extends CatalogException { + + private static final long serialVersionUID = 277182608283894949L; + + public UndefinedPartitionMethodException(String partitionName) { + super(ResultCode.UNDEFINED_PARTITION_METHOD, partitionName); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index 86fee86..a67be97 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -236,15 +236,23 @@ message PartitionMethodProto { required SchemaProto expressionSchema = 4; } +message AddPartitionsProto { + required TableIdentifierProto tableIdentifier = 1; + repeated PartitionDescProto partitionDesc = 2; + required bool ifNotExists = 3; +} + message PartitionDescProto { required string partitionName = 1; repeated PartitionKeyProto partitionKeys = 2; optional string path = 3; + optional int32 id = 4; } message PartitionKeyProto { required string columnName = 1; - required string partitionValue = 2; + optional string parentColumnName = 2; + required string partitionValue = 3; } message PartitionIdentifierProto { @@ -396,4 +404,4 @@ message IndexListResponse { message IndexResponse { required ReturnState state = 1; optional IndexDescProto indexDesc = 2; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java index b49499f..a0ff5c8 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java @@ -55,6 +55,7 @@ import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; import org.apache.thrift.TException; import java.io.IOException; @@ -768,14 +769,82 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { @Override public CatalogProtos.PartitionMethodProto getPartitionMethod(String databaseName, String tableName) throws CatalogException { - // TODO - not implemented yet - throw new UnsupportedOperationException(); + org.apache.hadoop.hive.ql.metadata.Table table; + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + PartitionMethodDesc partitionMethodDesc = null; + try { + try { + client = clientPool.getClient(); + table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName); + } catch (NoSuchObjectException nsoe) { + throw new UndefinedTableException(tableName); + } catch (Exception e) { + throw new TajoInternalError(e); + } + + // set partition keys + List<FieldSchema> partitionKeys = table.getPartitionKeys(); + + if (partitionKeys != null && partitionKeys.size() > 0) { + org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema(); + StringBuilder sb = new StringBuilder(); + if (partitionKeys.size() > 0) { + for (int i = 0; i < partitionKeys.size(); i++) { + FieldSchema fieldSchema = partitionKeys.get(i); + TajoDataTypes.Type dataType = HiveCatalogUtil.getTajoFieldType(fieldSchema.getType().toString()); + String fieldName = databaseName + CatalogConstants.IDENTIFIER_DELIMITER + tableName + + CatalogConstants.IDENTIFIER_DELIMITER + fieldSchema.getName(); + expressionSchema.addColumn(new Column(fieldName, dataType)); + if (i > 0) { + sb.append(","); + } + sb.append(fieldSchema.getName()); + } + partitionMethodDesc = new PartitionMethodDesc( + databaseName, + tableName, + PartitionType.COLUMN, + sb.toString(), + expressionSchema); + } + } else { + throw new UndefinedPartitionMethodException(tableName); + } + } finally { + if(client != null) client.release(); + } + + return partitionMethodDesc.getProto(); } @Override public boolean existPartitionMethod(String databaseName, String tableName) throws CatalogException { - // TODO - not implemented yet - throw new UnsupportedOperationException(); + boolean exist = false; + org.apache.hadoop.hive.ql.metadata.Table table; + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + try { + client = clientPool.getClient(); + table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName); + } catch (NoSuchObjectException nsoe) { + throw new UndefinedTableException(tableName); + } catch (Exception e) { + throw new TajoInternalError(e); + } + + // set partition keys + List<FieldSchema> partitionKeys = table.getPartitionKeys(); + + if (partitionKeys != null && partitionKeys.size() > 0) { + exist = true; + } + } finally { + if(client != null) client.release(); + } + + return exist; } @Override @@ -957,6 +1026,54 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { } @Override + public void addPartitions(String databaseName, String tableName, List<CatalogProtos.PartitionDescProto> partitions + , boolean ifNotExists) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + List<Partition> addPartitions = TUtil.newList(); + CatalogProtos.PartitionDescProto existingPartition = null; + + try { + client = clientPool.getClient(); + for (CatalogProtos.PartitionDescProto partitionDescProto : partitions) { + existingPartition = getPartition(databaseName, tableName, partitionDescProto.getPartitionName()); + + // Unfortunately, hive client add_partitions doesn't run as expected. The method never read the ifNotExists + // parameter. So, if Tajo adds existing partition to Hive, it will threw AlreadyExistsException. To avoid + // above error, we need to filter existing partitions before call add_partitions. + if (existingPartition != null) { + Partition partition = new Partition(); + partition.setDbName(databaseName); + partition.setTableName(tableName); + + List<String> values = Lists.newArrayList(); + for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) { + values.add(keyProto.getPartitionValue()); + } + partition.setValues(values); + + Table table = client.getHiveClient().getTable(databaseName, tableName); + StorageDescriptor sd = table.getSd(); + sd.setLocation(partitionDescProto.getPath()); + partition.setSd(sd); + + addPartitions.add(partition); + } + } + + if (addPartitions.size() > 0) { + client.getHiveClient().add_partitions(addPartitions, true, true); + } + } catch (Exception e) { + throw new TajoInternalError(e); + } finally { + if (client != null) { + client.release(); + } + } + + } + + @Override public List<TableOptionProto> getAllTableProperties() throws CatalogException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java index 0327367..4ed1ae8 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java @@ -950,13 +950,16 @@ public class CatalogServer extends AbstractService { if (store.existPartitionMethod(dbName, tbName)) { PartitionDescProto partitionDesc = store.getPartition(dbName, tbName, partitionName); - - - return GetPartitionDescResponse.newBuilder() + if (partitionDesc != null) { + return GetPartitionDescResponse.newBuilder() .setState(OK) .setPartition(partitionDesc) .build(); - + } else { + return GetPartitionDescResponse.newBuilder() + .setState(errUndefinedPartition(partitionName)) + .build(); + } } else { return GetPartitionDescResponse.newBuilder() .setState(errUndefinedPartitionMethod(tbName)) @@ -1065,6 +1068,41 @@ public class CatalogServer extends AbstractService { } @Override + public ReturnState addPartitions(RpcController controller, AddPartitionsProto request) { + + TableIdentifierProto identifier = request.getTableIdentifier(); + String databaseName = identifier.getDatabaseName(); + String tableName = identifier.getTableName(); + + rlock.lock(); + try { + boolean contain; + + contain = store.existDatabase(databaseName); + if (contain) { + contain = store.existTable(databaseName, tableName); + if (contain) { + if (store.existPartitionMethod(databaseName, tableName)) { + store.addPartitions(databaseName, tableName, request.getPartitionDescList(), request.getIfNotExists()); + return OK; + } else { + return errUndefinedPartitionMethod(tableName); + } + } else { + return errUndefinedTable(tableName); + } + } else { + return errUndefinedDatabase(databaseName); + } + } catch (Throwable t) { + printStackTraceIfError(LOG, t); + return returnError(t); + } finally { + rlock.unlock(); + } + } + + @Override public ReturnState createIndex(RpcController controller, IndexDescProto indexDesc) { String dbName = indexDesc.getTableIdentifier().getDatabaseName(); http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java index 2e9c340..b62624a 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java @@ -33,6 +33,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.InternalException; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.util.FileUtil; @@ -40,11 +41,7 @@ import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; import java.util.*; import static org.apache.tajo.catalog.exception.CatalogExceptionUtil.makeCatalogUpgrade; @@ -59,6 +56,23 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo protected final String connectionPassword; protected final String catalogUri; + protected final String insertPartitionSql = "INSERT INTO " + TB_PARTTIONS + + "(" + COL_TABLES_PK + ", PARTITION_NAME, PATH) VALUES (?, ? , ?)"; + + protected final String insertPartitionKeysSql = "INSERT INTO " + TB_PARTTION_KEYS + "(" + + COL_PARTITIONS_PK + ", " + COL_TABLES_PK + ", " + + COL_COLUMN_NAME + ", " + COL_PARTITION_VALUE + ")" + + " VALUES ( (" + + " SELECT " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS + + " WHERE " + COL_TABLES_PK + " = ? AND PARTITION_NAME = ? ) " + + " , ?, ?, ?)"; + + protected final String deletePartitionSql = "DELETE FROM " + TB_PARTTIONS + + " WHERE " + COL_PARTITIONS_PK + " = ? "; + + protected final String deletePartitionKeysSql = "DELETE FROM " + TB_PARTTIONS + + " WHERE " + COL_PARTITIONS_PK + " = ? "; + private Connection conn; protected XMLCatalogSchemaManager catalogSchemaManager; @@ -935,9 +949,9 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo } pstmt = conn.prepareStatement(statSql); - pstmt.setInt(1, tableId); - pstmt.setLong(2, statsProto.getStats().getNumRows()); - pstmt.setLong(3, statsProto.getStats().getNumBytes()); + pstmt.setLong(1, statsProto.getStats().getNumRows()); + pstmt.setLong(2, statsProto.getStats().getNumBytes()); + pstmt.setInt(3, tableId); pstmt.executeUpdate(); } @@ -1007,7 +1021,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo if(partitionDesc == null) { throw new UndefinedPartitionException(partitionName); } - dropPartition(tableId, alterTableDescProto.getPartitionDesc().getPartitionName()); + dropPartition(partitionDesc.getId()); break; case SET_PROPERTY: setProperties(tableId, alterTableDescProto.getParams()); @@ -1243,116 +1257,81 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo public void addPartition(int tableId, CatalogProtos.PartitionDescProto partition) throws CatalogException { Connection conn = null; - PreparedStatement pstmt = null; - final String ADD_PARTITION_SQL = - "INSERT INTO " + TB_PARTTIONS - + " (" + COL_TABLES_PK + ", PARTITION_NAME, PATH) VALUES (?,?,?)"; - - final String ADD_PARTITION_KEYS_SQL = - "INSERT INTO " + TB_PARTTION_KEYS + " (" + COL_PARTITIONS_PK + ", " + COL_COLUMN_NAME + ", " - + COL_PARTITION_VALUE + ") VALUES (?,?,?)"; + PreparedStatement pstmt1 = null, pstmt2 = null; try { - - if (LOG.isDebugEnabled()) { - LOG.debug(ADD_PARTITION_SQL); - } - conn = getConnection(); - pstmt = conn.prepareStatement(ADD_PARTITION_SQL); - pstmt.setInt(1, tableId); - pstmt.setString(2, partition.getPartitionName()); - pstmt.setString(3, partition.getPath()); - pstmt.executeUpdate(); - pstmt.close(); + conn.setAutoCommit(false); - if (partition.getPartitionKeysCount() > 0) { - pstmt = conn.prepareStatement(ADD_PARTITION_KEYS_SQL); - int partitionId = getPartitionId(tableId, partition.getPartitionName()); - addPartitionKeys(pstmt, partitionId, partition); - pstmt.executeBatch(); - } - } catch (SQLException se) { - throw new TajoInternalError(se); - } finally { - CatalogUtil.closeQuietly(pstmt); - } - } + pstmt1 = conn.prepareStatement(insertPartitionSql); + pstmt1.setInt(1, tableId); + pstmt1.setString(2, partition.getPartitionName()); + pstmt1.setString(3, partition.getPath()); + pstmt1.executeUpdate(); - public int getPartitionId(int tableId, String partitionName) throws CatalogException { - Connection conn = null; - ResultSet res = null; - PreparedStatement pstmt = null; - int retValue = -1; + pstmt2 = conn.prepareStatement(insertPartitionKeysSql); - try { - String sql = "SELECT " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS + - " WHERE " + COL_TABLES_PK + " = ? AND PARTITION_NAME = ? "; - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); + for (int i = 0; i < partition.getPartitionKeysCount(); i++) { + PartitionKeyProto partitionKey = partition.getPartitionKeys(i); + pstmt2.setInt(1, tableId); + pstmt2.setString(2, partition.getPartitionName()); + pstmt2.setInt(3, tableId); + pstmt2.setString(4, partitionKey.getColumnName()); + pstmt2.setString(5, partitionKey.getPartitionValue()); + pstmt2.addBatch(); + pstmt2.clearParameters(); } + pstmt2.executeBatch(); - conn = getConnection(); - pstmt = conn.prepareStatement(sql); - pstmt.setInt(1, tableId); - pstmt.setString(2, partitionName); - res = pstmt.executeQuery(); - - if (res.next()) { - retValue = res.getInt(1); + if (conn != null) { + conn.commit(); } } catch (SQLException se) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException e) { + LOG.error(e, e); + } + } throw new TajoInternalError(se); } finally { - CatalogUtil.closeQuietly(pstmt, res); + CatalogUtil.closeQuietly(pstmt1); + CatalogUtil.closeQuietly(pstmt2); } - return retValue; } - private void addPartitionKeys(PreparedStatement pstmt, int partitionId, PartitionDescProto partition) throws - SQLException { - for (int i = 0; i < partition.getPartitionKeysCount(); i++) { - PartitionKeyProto partitionKey = partition.getPartitionKeys(i); - - pstmt.setInt(1, partitionId); - pstmt.setString(2, partitionKey.getColumnName()); - pstmt.setString(3, partitionKey.getPartitionValue()); - - pstmt.addBatch(); - pstmt.clearParameters(); - } - } - - - private void dropPartition(int tableId, String partitionName) throws CatalogException { + private void dropPartition(int partitionId) throws CatalogException { Connection conn = null; - PreparedStatement pstmt = null; + PreparedStatement pstmt1 = null, pstmt2 = null; try { - int partitionId = getPartitionId(tableId, partitionName); - - String sqlDeletePartitionKeys = "DELETE FROM " + TB_PARTTION_KEYS + " WHERE " + COL_PARTITIONS_PK + " = ? "; - String sqlDeletePartition = "DELETE FROM " + TB_PARTTIONS + " WHERE " + COL_PARTITIONS_PK + " = ? "; - - if (LOG.isDebugEnabled()) { - LOG.debug(sqlDeletePartitionKeys); - } - conn = getConnection(); - pstmt = conn.prepareStatement(sqlDeletePartitionKeys); - pstmt.setInt(1, partitionId); - pstmt.executeUpdate(); - pstmt.close(); + conn.setAutoCommit(false); - pstmt = conn.prepareStatement(sqlDeletePartition); - pstmt.setInt(1, partitionId); - pstmt.executeUpdate(); + pstmt1 = conn.prepareStatement(deletePartitionKeysSql); + pstmt1.setInt(1, partitionId); + pstmt1.executeUpdate(); + + pstmt2 = conn.prepareStatement(deletePartitionSql); + pstmt2.setInt(1, partitionId); + pstmt2.executeUpdate(); + if (conn != null) { + conn.commit(); + } } catch (SQLException se) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException e) { + LOG.error(e, e); + } + } throw new TajoInternalError(se); } finally { - CatalogUtil.closeQuietly(pstmt); + CatalogUtil.closeQuietly(pstmt1); + CatalogUtil.closeQuietly(pstmt2); } } @@ -2054,6 +2033,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo if (res.next()) { builder = PartitionDescProto.newBuilder(); + builder.setId(res.getInt(COL_PARTITIONS_PK)); builder.setPath(res.getString("PATH")); builder.setPartitionName(partitionName); setPartitionKeys(res.getInt(COL_PARTITIONS_PK), builder); @@ -2075,7 +2055,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo PreparedStatement pstmt = null; try { - String sql = "SELECT "+ COL_COLUMN_NAME + " , "+ COL_PARTITION_VALUE + String sql = "SELECT "+ COL_COLUMN_NAME + " , "+ COL_PARTITION_VALUE + " FROM " + TB_PARTTION_KEYS + " WHERE " + COL_PARTITIONS_PK + " = ? "; conn = getConnection(); @@ -2169,6 +2149,118 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo return partitions; } + @Override + public void addPartitions(String databaseName, String tableName, List<CatalogProtos.PartitionDescProto> partitions + , boolean ifNotExists) throws CatalogException { + Connection conn = null; + + // To delete existing partition keys + PreparedStatement pstmt1 = null; + // To delete existing partition; + PreparedStatement pstmt2 = null; + // To insert a partition + PreparedStatement pstmt3 = null; + // To insert partition keys + PreparedStatement pstmt4 = null; + + PartitionDescProto partitionDesc = null; + + try { + int databaseId = getDatabaseId(databaseName); + int tableId = getTableId(databaseId, databaseName, tableName); + + conn = getConnection(); + conn.setAutoCommit(false); + + int currentIndex = 0, lastIndex = 0; + + pstmt1 = conn.prepareStatement(deletePartitionKeysSql); + pstmt2 = conn.prepareStatement(deletePartitionSql); + pstmt3 = conn.prepareStatement(insertPartitionSql); + pstmt4 = conn.prepareStatement(insertPartitionKeysSql); + + // Set a batch size like 1000. This avoids SQL injection and also takes care of out of memory issue. + int batchSize = conf.getInt(TajoConf.ConfVars.PARTITION_DYNAMIC_BULK_INSERT_BATCH_SIZE.varname, 1000); + for(currentIndex = 0; currentIndex < partitions.size(); currentIndex++) { + PartitionDescProto partition = partitions.get(currentIndex); + partitionDesc = getPartition(databaseName, tableName, partition.getPartitionName()); + + // Delete existing partition and partition keys + if (partitionDesc != null) { + if(ifNotExists) { + pstmt1.setInt(1, partitionDesc.getId()); + pstmt1.addBatch(); + pstmt1.clearParameters(); + + pstmt2.setInt(1, partitionDesc.getId()); + pstmt2.addBatch(); + pstmt2.clearParameters(); + } else { + throw new DuplicatePartitionException(partition.getPartitionName()); + } + } + + // Insert partition + pstmt3.setInt(1, tableId); + pstmt3.setString(2, partition.getPartitionName()); + pstmt3.setString(3, partition.getPath()); + pstmt3.addBatch(); + pstmt3.clearParameters(); + + // Insert partition keys + for (int i = 0; i < partition.getPartitionKeysCount(); i++) { + PartitionKeyProto partitionKey = partition.getPartitionKeys(i); + pstmt4.setInt(1, tableId); + pstmt4.setString(2, partition.getPartitionName()); + pstmt4.setInt(3, tableId); + pstmt4.setString(4, partitionKey.getColumnName()); + pstmt4.setString(5, partitionKey.getPartitionValue()); + + pstmt4.addBatch(); + pstmt4.clearParameters(); + } + + // Execute batch + if (currentIndex >= lastIndex + batchSize && lastIndex != currentIndex) { + pstmt1.executeBatch(); + pstmt1.clearBatch(); + pstmt2.executeBatch(); + pstmt2.clearBatch(); + pstmt3.executeBatch(); + pstmt3.clearBatch(); + pstmt4.executeBatch(); + pstmt4.clearBatch(); + lastIndex = currentIndex; + } + } + + // Execute existing batch queries + if (lastIndex != currentIndex) { + pstmt1.executeBatch(); + pstmt2.executeBatch(); + pstmt3.executeBatch(); + pstmt4.executeBatch(); + } + + if (conn != null) { + conn.commit(); + } + } catch (SQLException se) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException e) { + LOG.error(e, e); + } + } + throw new TajoInternalError(se); + } finally { + CatalogUtil.closeQuietly(pstmt1); + CatalogUtil.closeQuietly(pstmt2); + CatalogUtil.closeQuietly(pstmt3); + CatalogUtil.closeQuietly(pstmt4); + } + } @Override public void createIndex(final IndexDescProto proto) throws CatalogException { http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java index d8d0103..ef9ddd0 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java @@ -106,6 +106,9 @@ public interface CatalogStore extends Closeable { List<TablePartitionProto> getAllPartitions() throws CatalogException; + void addPartitions(String databaseName, String tableName, List<CatalogProtos.PartitionDescProto> partitions + , boolean ifNotExists) throws CatalogException; + /**************************** INDEX *******************************/ void createIndex(IndexDescProto proto) throws CatalogException; http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java index 74b6023..bcd9ce9 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java @@ -322,37 +322,13 @@ public class MemStore implements CatalogStore { if (partitions.containsKey(tableName) && partitions.get(tableName).containsKey(partitionName)) { throw new DuplicatePartitionException(partitionName); } else { - CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); - builder.setPartitionName(partitionName); - builder.setPath(partitionDesc.getPath()); - - if (partitionDesc.getPartitionKeysCount() > 0) { - for (CatalogProtos.PartitionKeyProto eachKey : partitionDesc.getPartitionKeysList()) { - CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder(); - keyBuilder.setColumnName(eachKey.getColumnName()); - keyBuilder.setPartitionValue(eachKey.getPartitionValue()); - builder.addPartitionKeys(keyBuilder.build()); - } - } - - Map<String, CatalogProtos.PartitionDescProto> protoMap = null; - if (!partitions.containsKey(tableName)) { - protoMap = Maps.newHashMap(); - } else { - protoMap = partitions.get(tableName); - } - protoMap.put(partitionName, builder.build()); - partitions.put(tableName, protoMap); + addPartition(partitionDesc, tableName, partitionName); } break; case DROP_PARTITION: partitionDesc = alterTableDescProto.getPartitionDesc(); partitionName = partitionDesc.getPartitionName(); - if(!partitions.containsKey(tableName)) { - throw new UndefinedPartitionException(partitionName); - } else { - partitions.get(tableName).remove(partitionName); - } + dropPartition(databaseName, tableName, partitionName); break; case SET_PROPERTY: KeyValueSet properties = new KeyValueSet(tableDescProto.getMeta().getParams()); @@ -372,6 +348,37 @@ public class MemStore implements CatalogStore { } } + private void addPartition(CatalogProtos.PartitionDescProto partitionDesc, String tableName, String partitionName) { + CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); + builder.setPartitionName(partitionName); + builder.setPath(partitionDesc.getPath()); + + if (partitionDesc.getPartitionKeysCount() > 0) { + for (CatalogProtos.PartitionKeyProto eachKey : partitionDesc.getPartitionKeysList()) { + CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder(); + keyBuilder.setColumnName(eachKey.getColumnName()); + keyBuilder.setPartitionValue(eachKey.getPartitionValue()); + builder.addPartitionKeys(keyBuilder.build()); + } + } + + Map<String, CatalogProtos.PartitionDescProto> protoMap = null; + if (!partitions.containsKey(tableName)) { + protoMap = Maps.newHashMap(); + } else { + protoMap = partitions.get(tableName); + } + protoMap.put(partitionName, builder.build()); + partitions.put(tableName, protoMap); + } + + private void dropPartition(String databaseName, String tableName, String partitionName) { + if(!partitions.containsKey(tableName)) { + throw new UndefinedPartitionException(partitionName); + } else { + partitions.get(tableName).remove(partitionName); + } + } private int getIndexOfColumnToBeRenamed(List<CatalogProtos.ColumnProto> fieldList, String columnName) { int fieldCount = fieldList.size(); @@ -608,6 +615,23 @@ public class MemStore implements CatalogStore { return protos; } + @Override + public void addPartitions(String databaseName, String tableName, List<CatalogProtos.PartitionDescProto> partitions + , boolean ifNotExists) throws CatalogException { + for(CatalogProtos.PartitionDescProto partition: partitions) { + String partitionName = partition.getPartitionName(); + + if (this.partitions.containsKey(tableName) && this.partitions.get(tableName).containsKey(partitionName)) { + if (ifNotExists) { + dropPartition(databaseName, tableName, partitionName); + } else { + throw new DuplicatePartitionException(partitionName); + } + } + addPartition(partition, tableName, partitionName); + } + } + /* (non-Javadoc) * @see CatalogStore#createIndex(nta.catalog.proto.CatalogProtos.IndexDescProto) */ http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml index 7ed9118..500ff71 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml @@ -19,6 +19,7 @@ <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 7 - 2015-07-30: Add a column and index for partition keys (TAJO-1346) * 6 - 2015-07-24: Merge the index branch into the master branch (TAJO-1300) * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616) * 4 - 2015-03-27: Partition Schema (TAJO-1284) @@ -26,7 +27,7 @@ * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="6"> + <tns:base version="7"> <tns:objects> <tns:Object order="0" type="table" name="META"> <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql> @@ -165,7 +166,8 @@ TID INT NOT NULL REFERENCES TABLES (TID) ON DELETE CASCADE, PARTITION_NAME VARCHAR(767), PATH VARCHAR(1024), - CONSTRAINT C_PARTITION_PK PRIMARY KEY (PARTITION_ID) + CONSTRAINT C_PARTITIONS_PK PRIMARY KEY (PARTITION_ID), + CONSTRAINT C_PARTITIONS_UNIQ UNIQUE (TID, PARTITION_NAME) )]]> </tns:sql> </tns:Object> @@ -176,13 +178,14 @@ <tns:sql><![CDATA[ CREATE TABLE PARTITION_KEYS ( PARTITION_ID INT NOT NULL REFERENCES PARTITIONS (PARTITION_ID) ON DELETE CASCADE, + TID INT NOT NULL, COLUMN_NAME VARCHAR(128) NOT NULL, PARTITION_VALUE VARCHAR(255) )]]> </tns:sql> </tns:Object> <tns:Object name="PARTITION_KEYS_IDX" type="index" dependsOn="PARTITION_KEYS" order="21"> - <tns:sql><![CDATA[CREATE INDEX PARTITION_KEYS_IDX ON PARTITION_KEYS(PARTITION_ID, COLUMN_NAME, PARTITION_VALUE)]]></tns:sql> + <tns:sql><![CDATA[CREATE INDEX PARTITION_KEYS_IDX ON PARTITION_KEYS(TID , COLUMN_NAME, PARTITION_VALUE)]]></tns:sql> </tns:Object> </tns:objects> </tns:base> http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml index 8750d2b..4583489 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml @@ -19,6 +19,7 @@ <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 7 - 2015-07-30: Add a column and index for partition keys (TAJO-1346) * 6 - 2015-07-24: Merge the index branch into the master branch (TAJO-1300) * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616) * 4 - 2015-03-27: Partition Schema (TAJO-1284) @@ -26,7 +27,7 @@ * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="6"> + <tns:base version="7"> <tns:objects> <tns:Object order="0" type="table" name="META"> <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql> @@ -142,7 +143,7 @@ TID INT NOT NULL, PARTITION_NAME VARCHAR(255) BINARY, PATH VARCHAR(4096) BINARY, - CONSTRAINT CONST_PARTITION_UNIQUE UNIQUE (PARTITION_NAME), + UNIQUE INDEX PARTITION_UNIQUE_IDX (TID, PARTITION_NAME), FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE )]]> </tns:sql> @@ -151,9 +152,10 @@ <tns:sql><![CDATA[ CREATE TABLE PARTITION_KEYS ( PARTITION_ID INT NOT NULL, + TID INT NOT NULL, COLUMN_NAME VARCHAR(255) BINARY NOT NULL, - PARTITION_VALUE VARCHAR(255) BINARY NOT NULL, - UNIQUE INDEX PARTITION_KEYS_IDX (PARTITION_ID, COLUMN_NAME, PARTITION_VALUE), + PARTITION_VALUE VARCHAR(255) BINARY, + INDEX PARTITION_KEYS_IDX (TID, COLUMN_NAME, PARTITION_VALUE), FOREIGN KEY (PARTITION_ID) REFERENCES PARTITIONS (PARTITION_ID) ON DELETE CASCADE )]]> </tns:sql> http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml index 763d0f7..94c4680 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml @@ -19,6 +19,7 @@ <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 7 - 2015-07-30: Add a column and index for partition keys (TAJO-1346) * 6 - 2015-07-24: Merge the index branch into the master branch (TAJO-1300) * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616) * 4 - 2015-03-27: Partition Schema (TAJO-1284) @@ -26,7 +27,7 @@ * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="6"> + <tns:base version="7"> <tns:objects> <tns:Object order="0" type="table" name="META"> <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql> @@ -143,7 +144,7 @@ TID INT NOT NULL, PARTITION_NAME VARCHAR(255) BINARY, PATH VARCHAR(4096) BINARY, - CONSTRAINT CONST_PARTITION_UNIQUE UNIQUE (PARTITION_NAME), + UNIQUE INDEX PARTITION_UNIQUE_IDX (TID, PARTITION_NAME), FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE )]]> </tns:sql> @@ -152,9 +153,10 @@ <tns:sql><![CDATA[ CREATE TABLE PARTITION_KEYS ( PARTITION_ID INT NOT NULL, + TID INT NOT NULL, COLUMN_NAME VARCHAR(255) BINARY NOT NULL, - PARTITION_VALUE VARCHAR(255) BINARY NOT NULL, - UNIQUE INDEX PARTITION_KEYS_IDX (PARTITION_ID, COLUMN_NAME, PARTITION_VALUE), + PARTITION_VALUE VARCHAR(255) BINARY, + INDEX PARTITION_KEYS_IDX (TID, COLUMN_NAME, PARTITION_VALUE), FOREIGN KEY (PARTITION_ID) REFERENCES PARTITIONS (PARTITION_ID) ON DELETE CASCADE )]]> </tns:sql> http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml index 3d8ef30..fb715ae 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml @@ -19,6 +19,7 @@ <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 7 - 2015-07-30: Add a column and index for partition keys (TAJO-1346) * 6 - 2015-07-24: Merge the index branch into the master branch (TAJO-1300) * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616) * 4 - 2015-03-27: Partition Schema (TAJO-1284) @@ -26,7 +27,7 @@ * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="6"> + <tns:base version="7"> <tns:objects> <tns:Object order="0" type="table" name="meta"> <tns:sql><![CDATA[ @@ -218,10 +219,14 @@ END;]]> </tns:sql> </tns:Object> - <tns:Object order="21" type="table" name="PARTITION_KEYS"> + <tns:Object order="21" type="index" name="PARTITIONS_UNIQUE_IDX" dependsOn="PARTITIONS"> + <tns:sql><![CDATA[CREATE UNIQUE INDEX PARTITIONS_UNIQUE_IDX on PARTITIONS (TID, PARTITION_NAME)]]></tns:sql> + </tns:Object> + <tns:Object order="22" type="table" name="PARTITION_KEYS"> <tns:sql><![CDATA[ CREATE TABLE PARTITION_KEYS ( PARTITION_ID INT NOT NULL, + TID INT NOT NULL, COLUMN_NAME VARCHAR2(255) NOT NULL, PARTITION_VALUE VARCHAR(255) NULL, FOREIGN KEY (PARTITION_ID) REFERENCES PARTITIONS (PARTITION_ID) ON DELETE CASCADE @@ -229,7 +234,7 @@ </tns:sql> </tns:Object> <tns:Object order="23" type="index" name="PARTITION_KEYS_IDX" dependsOn="PARTITION_KEYS"> - <tns:sql><![CDATA[CREATE INDEX PARTITION_KEYS_IDX on PARTITION_KEYS (PARTITION_ID, COLUMN_NAME, PARTITION_VALUE)]]></tns:sql> + <tns:sql><![CDATA[CREATE INDEX PARTITION_KEYS_IDX on PARTITION_KEYS (TID, COLUMN_NAME, PARTITION_VALUE)]]></tns:sql> </tns:Object> </tns:objects> </tns:base> http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml index 0dd72b9..b7b94fc 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml @@ -21,6 +21,7 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 7 - 2015-07-30: Add a column and index for partition keys (TAJO-1346) * 6 - 2015-07-24: Merge the index branch into the master branch (TAJO-1300) * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616) * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616) @@ -29,7 +30,7 @@ xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition. * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="6"> + <tns:base version="7"> <tns:objects> <tns:Object name="META" type="table" order="0"> <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql> @@ -161,23 +162,26 @@ xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition. PARTITION_NAME VARCHAR(128), PARTITION_VALUE VARCHAR(1024), PATH VARCHAR(4096), - FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE, - CONSTRAINT C_PARTITION_UNIQUE UNIQUE (PARTITION_NAME) + FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE )]]> </tns:sql> </tns:Object> - <tns:Object name="PARTITION_KEYS" type="table" order="16"> + <tns:Object name="PARTITIONS_UNIQUE_IDX" type="index" order="16" dependsOn="PARTITIONS"> + <tns:sql><![CDATA[CREATE UNIQUE INDEX PARTITIONS_UNIQUE_IDX on PARTITIONS (TID, PARTITION_NAME)]]></tns:sql> + </tns:Object> + <tns:Object name="PARTITION_KEYS" type="table" order="17"> <tns:sql><![CDATA[ CREATE TABLE PARTITION_KEYS ( PARTITION_ID INT NOT NULL, + TID INT NOT NULL, COLUMN_NAME VARCHAR(255) NOT NULL, PARTITION_VALUE VARCHAR(255) NULL, FOREIGN KEY (PARTITION_ID) REFERENCES PARTITIONS (PARTITION_ID) ON DELETE CASCADE )]]> </tns:sql> </tns:Object> - <tns:Object name="PARTITION_KEYS_IDX" type="index" order="17" dependsOn="PARTITION_KEYS"> - <tns:sql><![CDATA[CREATE INDEX PARTITION_KEYS_IDX on PARTITION_KEYS (PARTITION_ID, COLUMN_NAME, PARTITION_VALUE)]]></tns:sql> + <tns:Object name="PARTITION_KEYS_IDX" type="index" order="18" dependsOn="PARTITION_KEYS"> + <tns:sql><![CDATA[CREATE INDEX PARTITION_KEYS_IDX on PARTITION_KEYS (TID, COLUMN_NAME, PARTITION_VALUE)]]></tns:sql> </tns:Object> </tns:objects> </tns:base> http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java index 1206bfa..caa85e8 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java @@ -22,7 +22,6 @@ import com.google.common.collect.Sets; import org.apache.hadoop.fs.Path; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary; -import org.apache.tajo.catalog.exception.CatalogException; import org.apache.tajo.catalog.exception.UndefinedFunctionException; import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.partition.PartitionMethodDesc; @@ -952,11 +951,11 @@ public class TestCatalog { List<PartitionKeyProto> partitionKeyList = new ArrayList<PartitionKeyProto>(); for(int i = 0; i < partitionNames.length; i++) { String columnName = partitionNames[i].split("=")[0]; + String partitionValue = partitionNames[i].split("=")[1]; PartitionKeyProto.Builder builder = PartitionKeyProto.newBuilder(); - builder.setColumnName(partitionNames[i]); + builder.setColumnName(partitionValue); builder.setPartitionValue(columnName); - partitionKeyList.add(builder.build()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 4b4de62..1f7f2fa 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -290,6 +290,10 @@ public class TajoConf extends Configuration { PYTHON_CODE_DIR("tajo.function.python.code-dir", ""), PYTHON_CONTROLLER_LOG_DIR("tajo.function.python.controller.log-dir", ""), + // Partition + PARTITION_DYNAMIC_BULK_INSERT_BATCH_SIZE("tajo.partition.dynamic.bulk-insert.batch-size", 1000), + + ///////////////////////////////////////////////////////////////////////////////// // User Session Configuration // http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java index 3f05f44..d50164d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java @@ -63,6 +63,7 @@ public class ErrorMessages { ADD_MESSAGE(UNDEFINED_COLUMN, "column '%s' does not exist", 1); ADD_MESSAGE(UNDEFINED_FUNCTION, "function does not exist: %s", 1); ADD_MESSAGE(UNDEFINED_PARTITION, "partition '%s' does not exist", 1); + ADD_MESSAGE(UNDEFINED_PARTITION_KEY, "'%s' column is not the partition key", 1); ADD_MESSAGE(UNDEFINED_OPERATOR, "operator does not exist: '%s'", 1); ADD_MESSAGE(UNDEFINED_INDEX_FOR_TABLE, "index ''%s' does not exist", 1); ADD_MESSAGE(UNDEFINED_INDEX_FOR_COLUMNS, "index does not exist for '%s' columns of '%s' table", 2); @@ -94,6 +95,10 @@ public class ErrorMessages { ADD_MESSAGE(MDC_NO_MATCHED_DATATYPE, "no matched type for %s", 1); ADD_MESSAGE(UNKNOWN_DATAFORMAT, "Unknown data format: '%s'", 1); + + ADD_MESSAGE(AMBIGUOUS_PARTITION_DIRECTORY, "There is a directory which is assumed to be a partitioned directory" + + " : '%s'", 1); + } private static void ADD_MESSAGE(ResultCode code, String msgFormat) { http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-common/src/main/proto/errors.proto ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/proto/errors.proto b/tajo-common/src/main/proto/errors.proto index 5e2ecc0..0bc0069 100644 --- a/tajo-common/src/main/proto/errors.proto +++ b/tajo-common/src/main/proto/errors.proto @@ -109,6 +109,7 @@ enum ResultCode { UNDEFINED_PARTITION = 520; // ? UNDEFINED_PARTITION_METHOD = 521; // ? UNDEFINED_OPERATOR = 522; // SQLState: 42883 (=UNDEFINED_FUNCTION) + UNDEFINED_PARTITION_KEY = 523; // ? DUPLICATE_TABLESPACE = 531; DUPLICATE_DATABASE = 532; // SQLState: 42P04 @@ -123,6 +124,7 @@ enum ResultCode { AMBIGUOUS_TABLE = 541; // ? AMBIGUOUS_COLUMN = 542; // SQLState: 42702; AMBIGUOUS_FUNCTION = 543; // SQLState: 42725; + AMBIGUOUS_PARTITION_DIRECTORY = 544; // ? CANNOT_CAST = 601; // SQLState: 42846 - Cast from source type to target type is not supported. GROUPING_ERROR = 602; // SQLState: 42803 http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 index b07fb8f..098994d 100644 --- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 +++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 @@ -1611,8 +1611,8 @@ alter_table_statement : ALTER TABLE table_name RENAME TO table_name | ALTER TABLE table_name RENAME COLUMN column_name TO column_name | ALTER TABLE table_name ADD COLUMN field_element - | ALTER TABLE table_name (if_not_exists)? ADD PARTITION LEFT_PAREN partition_column_value_list RIGHT_PAREN (LOCATION path=Character_String_Literal)? - | ALTER TABLE table_name (if_exists)? DROP PARTITION LEFT_PAREN partition_column_value_list RIGHT_PAREN (PURGE)? + | ALTER TABLE table_name ADD (if_not_exists)? PARTITION LEFT_PAREN partition_column_value_list RIGHT_PAREN (LOCATION path=Character_String_Literal)? + | ALTER TABLE table_name DROP (if_exists)? PARTITION LEFT_PAREN partition_column_value_list RIGHT_PAREN (PURGE)? | ALTER TABLE table_name SET PROPERTY property_list ; http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java index c50d5be..ffe5d2e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java @@ -1886,6 +1886,8 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { alterTable.setLocation(path); } alterTable.setPurge(checkIfExist(ctx.PURGE())); + alterTable.setIfNotExists(checkIfExist(ctx.if_not_exists())); + alterTable.setIfExists(checkIfExist(ctx.if_exists())); } if (checkIfExist(ctx.property_list())) { http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index 33714db..a8a1c78 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -29,6 +29,8 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionKeyProto; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.plan.logical.CreateTableNode; import org.apache.tajo.plan.logical.InsertNode; @@ -135,7 +137,9 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { LOG.info("Path " + lastFileName.getParent() + " already exists!"); } else { fs.mkdirs(lastFileName.getParent()); - LOG.info("Add subpartition path directory :" + lastFileName.getParent()); + if (LOG.isDebugEnabled()) { + LOG.debug("Add subpartition path directory :" + lastFileName.getParent()); + } } if (fs.exists(lastFileName)) { @@ -146,9 +150,47 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { openAppender(0); + addPartition(partition); + return appender; } + /** + * Add partition information to TableStats for storing to CatalogStore. + * + * @param partition partition name + * @throws IOException + */ + private void addPartition(String partition) throws IOException { + PartitionDescProto.Builder builder = PartitionDescProto.newBuilder(); + builder.setPartitionName(partition); + + String[] partitionKeyPairs = partition.split("/"); + + for(int i = 0; i < partitionKeyPairs.length; i++) { + String partitionKeyPair = partitionKeyPairs[i]; + String[] split = partitionKeyPair.split("="); + + PartitionKeyProto.Builder keyBuilder = PartitionKeyProto.newBuilder(); + keyBuilder.setColumnName(split[0]); + keyBuilder.setPartitionValue(split[1]); + + builder.addPartitionKeys(keyBuilder.build()); + } + + if (this.plan.getUri() == null) { + // In CTAS, the uri would be null. So, + String[] split = CatalogUtil.splitTableName(plan.getTableName()); + int endIndex = storeTablePath.toString().indexOf(split[1]) + split[1].length(); + String outputPath = storeTablePath.toString().substring(0, endIndex); + builder.setPath(outputPath + "/" + partition); + } else { + builder.setPath(this.plan.getUri().toString() + "/" + partition); + } + + context.addPartition(builder.build()); + } + public void openAppender(int suffixId) throws IOException { Path actualFilePath = lastFileName; if (suffixId > 0) { http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java index a535f94..048bab2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java @@ -562,61 +562,70 @@ public class DDLExecutor { boolean duplicatedPartition = true; try { catalog.getPartition(databaseName, simpleTableName, pair.getSecond()); - } catch (UndefinedPartitionException npe) { + } catch (UndefinedPartitionException e) { duplicatedPartition = false; } - if (duplicatedPartition) { - throw new DuplicatePartitionException(pair.getSecond()); - } - if (alterTable.getLocation() != null) { - partitionPath = new Path(alterTable.getLocation()); - } else { - // If location is not specified, the partition's location will be set using the table location. - partitionPath = new Path(desc.getUri().toString(), pair.getSecond()); - alterTable.setLocation(partitionPath.toString()); - } + if (duplicatedPartition && !alterTable.isIfNotExists()) { + throw new DuplicatePartitionException(pair.getSecond()); + } else if (!duplicatedPartition) { + if (alterTable.getLocation() != null) { + partitionPath = new Path(alterTable.getLocation()); + } else { + // If location is not specified, the partition's location will be set using the table location. + partitionPath = new Path(desc.getUri().toString(), pair.getSecond()); + alterTable.setLocation(partitionPath.toString()); + } - FileSystem fs = partitionPath.getFileSystem(context.getConf()); + FileSystem fs = partitionPath.getFileSystem(context.getConf()); - // If there is a directory which was assumed to be a partitioned directory and users don't input another - // location, this will throw exception. - Path assumedDirectory = new Path(desc.getUri().toString(), pair.getSecond()); + // If there is a directory which was assumed to be a partitioned directory and users don't input another + // location, this will throw exception. + Path assumedDirectory = new Path(desc.getUri().toString(), pair.getSecond()); - if (fs.exists(assumedDirectory) && !assumedDirectory.equals(partitionPath)) { - throw new AlreadyExistsAssumedPartitionDirectoryException(assumedDirectory.toString()); - } + if (fs.exists(assumedDirectory) && !assumedDirectory.equals(partitionPath)) { + throw new AmbiguousPartitionDirectoryExistException(assumedDirectory.toString()); + } - catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, alterTable.getPartitionColumns(), - alterTable.getPartitionValues(), alterTable.getLocation(), AlterTableType.ADD_PARTITION)); + catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, alterTable.getPartitionColumns(), + alterTable.getPartitionValues(), alterTable.getLocation(), AlterTableType.ADD_PARTITION)); - // If the partition's path doesn't exist, this would make the directory by force. - if (!fs.exists(partitionPath)) { - fs.mkdirs(partitionPath); + // If the partition's path doesn't exist, this would make the directory by force. + if (!fs.exists(partitionPath)) { + fs.mkdirs(partitionPath); + } } + break; case DROP_PARTITION: ensureColumnPartitionKeys(qualifiedName, alterTable.getPartitionColumns()); pair = CatalogUtil.getPartitionKeyNamePair(alterTable.getPartitionColumns(), alterTable.getPartitionValues()); - partitionDescProto = catalog.getPartition(databaseName, simpleTableName, pair.getSecond()); - if (partitionDescProto == null) { - throw new NoSuchPartitionException(tableName, pair.getSecond()); + boolean undefinedPartition = false; + try { + partitionDescProto = catalog.getPartition(databaseName, simpleTableName, pair.getSecond()); + } catch (UndefinedPartitionException e) { + undefinedPartition = true; } - catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, alterTable.getPartitionColumns(), - alterTable.getPartitionValues(), alterTable.getLocation(), AlterTableType.DROP_PARTITION)); + if (undefinedPartition && !alterTable.isIfExists()) { + throw new UndefinedPartitionException(pair.getSecond()); + } else if (!undefinedPartition) { + catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, alterTable.getPartitionColumns(), + alterTable.getPartitionValues(), alterTable.getLocation(), AlterTableType.DROP_PARTITION)); - // When dropping partition on an managed table, the data will be delete from file system. - if (!desc.isExternal()) { - deletePartitionPath(partitionDescProto); - } else { - // When dropping partition on an external table, the data in the table will NOT be deleted from the file - // system. But if PURGE is specified, the partition data will be deleted. - if (alterTable.isPurge()) { + // When dropping partition on an managed table, the data will be delete from file system. + if (!desc.isExternal()) { deletePartitionPath(partitionDescProto); + } else { + // When dropping partition on an external table, the data in the table will NOT be deleted from the file + // system. But if PURGE is specified, the partition data will be deleted. + if (alterTable.isPurge()) { + deletePartitionPath(partitionDescProto); + } } } + break; default: //TODO @@ -634,7 +643,7 @@ public class DDLExecutor { private boolean ensureColumnPartitionKeys(String tableName, String[] columnNames) { for(String columnName : columnNames) { if (!ensureColumnPartitionKeys(tableName, columnName)) { - throw new NoSuchPartitionKeyException(tableName, columnName); + throw new UndefinedPartitionKeyException(columnName); } } return true; http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index e3629c7..f351143 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -34,8 +34,8 @@ import org.apache.tajo.QueryVars; import org.apache.tajo.SessionVars; import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.exception.CatalogException; import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; @@ -331,6 +331,17 @@ public class Query implements EventHandler<QueryEvent> { return queryHistory; } + public List<PartitionDescProto> getPartitions() { + List<PartitionDescProto> partitions = new ArrayList<PartitionDescProto>(); + for(Stage eachStage : getStages()) { + if (!eachStage.getPartitions().isEmpty()) { + partitions.addAll(eachStage.getPartitions()); + } + } + + return partitions; + } + public List<String> getDiagnostics() { readLock.lock(); try { @@ -493,6 +504,35 @@ public class Query implements EventHandler<QueryEvent> { QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir); + TableDesc desc = query.getResultDesc(); + + // If there is partitions + List<PartitionDescProto> partitions = query.getPartitions(); + if (partitions!= null && !partitions.isEmpty()) { + + String databaseName, simpleTableName; + + if (CatalogUtil.isFQTableName(desc.getName())) { + String[] split = CatalogUtil.splitFQTableName(desc.getName()); + databaseName = split[0]; + simpleTableName = split[1]; + } else { + databaseName = queryContext.getCurrentDatabase(); + simpleTableName = desc.getName(); + } + + // Store partitions to CatalogStore using alter table statement. + boolean result = catalog.addPartitions(databaseName, simpleTableName, partitions, true); + if (result) { + LOG.info(String.format("Complete adding for partition %s", partitions.size())); + } else { + LOG.info(String.format("Incomplete adding for partition %s", partitions.size())); + } + } else { + LOG.info("Can't find partitions for adding."); + } + + } catch (Exception e) { query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); return QueryState.QUERY_ERROR;
