TAJO-1284: Add alter partition method to CatalogStore. (jaehwa) Closes #448
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cad54428 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cad54428 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cad54428 Branch: refs/heads/index_support Commit: cad54428dd803cdb8caf3d51a9ba7e1d83a99b01 Parents: a01292f Author: JaeHwa Jung <[email protected]> Authored: Fri Mar 27 11:01:32 2015 +0900 Committer: JaeHwa Jung <[email protected]> Committed: Fri Mar 27 11:01:32 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/catalog/AbstractCatalogClient.java | 45 ++- .../src/main/proto/CatalogProtocol.proto | 7 +- .../org/apache/tajo/catalog/AlterTableDesc.java | 21 +- .../org/apache/tajo/catalog/AlterTableType.java | 2 +- .../apache/tajo/catalog/CatalogConstants.java | 5 + .../org/apache/tajo/catalog/CatalogService.java | 9 +- .../AlreadyExistsPartitionException.java | 33 ++ .../exception/NoSuchPartitionException.java | 39 +++ .../tajo/catalog/partition/PartitionDesc.java | 97 +++--- .../tajo/catalog/partition/PartitionKey.java | 147 ++++++++ .../src/main/proto/CatalogProtos.proto | 30 +- .../tajo/catalog/store/HCatalogStore.java | 125 +++++-- .../tajo/catalog/store/TestHCatalogStore.java | 77 ++++- .../org/apache/tajo/catalog/CatalogServer.java | 100 ++++-- .../InfoSchemaMetadataDictionary.java | 2 + .../PartitionKeysTableDescriptor.java | 46 +++ .../dictionary/PartitionsTableDescriptor.java | 3 +- .../tajo/catalog/store/AbstractDBStore.java | 337 +++++++++++++------ .../store/AbstractMySQLMariaDBStore.java | 14 + .../apache/tajo/catalog/store/CatalogStore.java | 14 +- .../org/apache/tajo/catalog/store/MemStore.java | 109 ++++-- .../src/main/resources/schemas/derby/derby.xml | 27 +- .../schemas/mariadb/partition_keys.sql | 6 + .../resources/schemas/mariadb/partitions.sql | 13 +- .../resources/schemas/mysql/partition_keys.sql | 6 + .../main/resources/schemas/mysql/partitions.sql | 13 +- .../main/resources/schemas/oracle/oracle.xml | 45 ++- .../resources/schemas/postgresql/postgresql.xml | 16 +- .../org/apache/tajo/catalog/TestCatalog.java | 73 +++- .../NonForwardQueryResultSystemScanner.java | 6 +- .../tajo/jdbc/util/TestQueryStringDecoder.java | 14 +- 32 files changed, 1180 insertions(+), 303 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 575a389..029735f 100644 --- a/CHANGES +++ b/CHANGES @@ -108,6 +108,8 @@ Release 0.11.0 - unreleased SUB TASKS + TAJO-1284: Add alter partition method to CatalogStore. (jaehwa) + TAJO-1392: Resolve findbug warnings on Tajo Plan Module. (jihun) TAJO-1393: Resolve findbug warnings on Tajo Cli Module. http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index d8350a3..458d6e0 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -410,7 +410,50 @@ public abstract class AbstractCatalogClient implements CatalogService { return false; } } - + + @Override + public final PartitionDescProto getPartition(final String databaseName, final String tableName, + final String partitionName) { + try { + return new ServerCallable<PartitionDescProto>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + public PartitionDescProto call(NettyClientBase client) throws ServiceException { + + PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setTableName(tableName); + builder.setPartitionName(partitionName); + + CatalogProtocolService.BlockingInterface stub = getStub(client); + return stub.getPartitionByPartitionName(null, builder.build()); + } + }.withRetries(); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); + return null; + } + } + + @Override + public final List<PartitionDescProto> getPartitions(final String databaseName, final String tableName) { + try { + return new ServerCallable<List<PartitionDescProto>>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, + false) { + public List<PartitionDescProto> call(NettyClientBase client) throws ServiceException { + + PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setTableName(tableName); + + CatalogProtocolService.BlockingInterface stub = getStub(client); + PartitionsProto response = stub.getPartitionsByTableName(null, builder.build()); + return response.getPartitionList(); + } + }.withRetries(); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); + return null; + } + } @Override public List<TablePartitionProto> getAllPartitions() { try { http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto index cae5d88..5ace32e 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto +++ b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto @@ -56,11 +56,8 @@ service CatalogProtocolService { rpc existPartitionMethod(TableIdentifierProto) returns (BoolProto); rpc dropPartitionMethod(TableIdentifierProto) returns (BoolProto); - rpc addPartitions(PartitionsProto) returns (BoolProto); - rpc addPartition(PartitionDescProto) returns (BoolProto); - rpc getPartitionByPartitionName(StringProto) returns (PartitionDescProto); - rpc getPartitionsByTableName(StringProto) returns (PartitionsProto); - rpc delAllPartitions(StringProto) returns (PartitionsProto); + rpc getPartitionByPartitionName(PartitionIdentifierProto) returns (PartitionDescProto); + rpc getPartitionsByTableName(PartitionIdentifierProto) returns (PartitionsProto); rpc getAllPartitions(NullProto) returns (GetTablePartitionsProto); rpc createIndex(IndexDescProto) returns (BoolProto); http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java index 69d5be4..f1265fb 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java @@ -21,6 +21,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.Expose; import org.apache.tajo.catalog.json.CatalogGsonHelper; +import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.json.GsonObject; @@ -40,7 +41,9 @@ public class AlterTableDesc implements ProtoObject<AlterTableDescProto>, GsonObj @Expose protected String newColumnName; //optional @Expose - protected Column addColumn = null; //optiona + protected Column addColumn = null; //optional + @Expose + protected PartitionDesc partitionDesc; //optional public AlterTableDesc() { } @@ -94,6 +97,10 @@ public class AlterTableDesc implements ProtoObject<AlterTableDescProto>, GsonObj this.alterTableType = alterTableType; } + public PartitionDesc getPartitionDesc() { return partitionDesc; } + + public void setPartitionDesc(PartitionDesc partitionDesc) { this.partitionDesc = partitionDesc; } + @Override public String toString() { Gson gson = new GsonBuilder().setPrettyPrinting(). @@ -109,6 +116,7 @@ public class AlterTableDesc implements ProtoObject<AlterTableDescProto>, GsonObj newAlter.newTableName = newTableName; newAlter.columnName = newColumnName; newAlter.addColumn = addColumn; + newAlter.partitionDesc = partitionDesc; return newAlter; } @@ -147,8 +155,19 @@ public class AlterTableDesc implements ProtoObject<AlterTableDescProto>, GsonObj case ADD_COLUMN: builder.setAlterTableType(CatalogProtos.AlterTableType.ADD_COLUMN); break; + case ADD_PARTITION: + builder.setAlterTableType(CatalogProtos.AlterTableType.ADD_PARTITION); + break; + case DROP_PARTITION: + builder.setAlterTableType(CatalogProtos.AlterTableType.DROP_PARTITION); + break; default: } + + if (null != this.partitionDesc) { + builder.setPartitionDesc(partitionDesc.getProto()); + } + return builder.build(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java index 0b7639c..7e3be91 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java @@ -18,5 +18,5 @@ package org.apache.tajo.catalog; public enum AlterTableType { - RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN + RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN, ADD_PARTITION, DROP_PARTITION } http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java index a8c5c9b..8265e38 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java @@ -48,11 +48,16 @@ public class CatalogConstants { public static final String TB_STATISTICS = "STATS"; public static final String TB_PARTITION_METHODS = "PARTITION_METHODS"; public static final String TB_PARTTIONS = "PARTITIONS"; + public static final String TB_PARTTION_KEYS = "PARTITION_KEYS"; public static final String COL_TABLESPACE_PK = "SPACE_ID"; public static final String COL_DATABASES_PK = "DB_ID"; public static final String COL_TABLES_PK = "TID"; public static final String COL_TABLES_NAME = "TABLE_NAME"; + + public static final String COL_PARTITIONS_PK = "PARTITION_ID"; + public static final String COL_COLUMN_NAME = "COLUMN_NAME"; + public static final String COL_PARTITION_VALUE = "PARTITION_VALUE"; public static final String INFORMATION_SCHEMA_DB_NAME = "information_schema"; } http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java index 2a5d890..86b773b 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java @@ -19,6 +19,7 @@ package org.apache.tajo.catalog; import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto; @@ -183,7 +184,11 @@ public interface CatalogService { PartitionMethodDesc getPartitionMethod(String databaseName, String tableName); boolean existPartitionMethod(String databaseName, String tableName); - + + CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, String partitionName); + + List<CatalogProtos.PartitionDescProto> getPartitions(String databaseName, String tableName); + List<TablePartitionProto> getAllPartitions(); boolean createIndex(IndexDesc index); @@ -221,4 +226,6 @@ public interface CatalogService { boolean updateTableStats(UpdateTableStatsProto stats); + + } http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsPartitionException.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsPartitionException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsPartitionException.java new file mode 100644 index 0000000..ab6144f --- /dev/null +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsPartitionException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.exception; + +public class AlreadyExistsPartitionException extends RuntimeException { + + private static final long serialVersionUID = 277182608283894930L; + + public AlreadyExistsPartitionException(String message) { + super(message); + } + + public AlreadyExistsPartitionException(String databaseName, String tableName, String partitionName) { + super(String.format("ERROR: \"%s already exist in \"%s.%s\"", partitionName, databaseName, tableName)); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java new file mode 100644 index 0000000..45c9299 --- /dev/null +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.exception; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.function.FunctionUtil; +import org.codehaus.jackson.schema.JsonSerializableSchema; + +import java.util.Collection; + +public class NoSuchPartitionException extends RuntimeException { + + private static final long serialVersionUID = 277182608283894938L; + + public NoSuchPartitionException(String message) { + super(message); + } + + public NoSuchPartitionException(String databaseName, String tableName, String partitionName) { + super(String.format("ERROR: \"%s\" does not exist in \"%s.%s\".", partitionName, databaseName, tableName)); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java index d775ba8..b6d883d 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java @@ -19,20 +19,44 @@ package org.apache.tajo.catalog.partition; import com.google.common.base.Objects; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.annotations.Expose; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.json.CatalogGsonHelper; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.json.GsonObject; +import java.util.ArrayList; +import java.util.List; + /** - * <code>PartitionDesc</code> presents a table partition. + * This presents each partitions of column partitioned table. + * Each partitions can have a own name, partition path, colum name and partition value pairs. + * + * For example, consider you have a partitioned table as follows: + * + * create external table table1 (id text, name text) PARTITION BY COLUMN (dt text, phone text, + * gender text) USING RCFILE LOCATION '/tajo/data/table1'; + * + * Then, its data will be stored on HDFS as follows: + * - /tajo/data/table1/dt=20150301/phone=1300/gender=m + * - /tajo/data/table1/dt=20150301/phone=1300/gender=f + * - /tajo/data/table1/dt=20150302/phone=1500/gender=m + * - /tajo/data/table1/dt=20150302/phone=1500/gender=f + * + * In such as above, first directory can be presented with this class as follows: + * - partitionName : dt=20150301/phone=1300/gender=m + * - path: /tajo/data/table1/dt=20150301/phone=1300/gender=m + * - partitionKeys: + * dt=20150301, phone=1300, gender=m + * */ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescProto>, Cloneable, GsonObject { - @Expose protected String partitionName; // optional - @Expose protected int ordinalPosition; // required - @Expose protected String partitionValue; // optional - @Expose protected String path; // optional + @Expose protected String partitionName; + @Expose protected List<PartitionKey> partitionKeys; + @Expose protected String path; //optional private CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); @@ -41,8 +65,7 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro public PartitionDesc(PartitionDesc partition) { this.partitionName = partition.partitionName; - this.ordinalPosition = partition.ordinalPosition; - this.partitionValue = partition.partitionValue; + this.partitionKeys = partition.partitionKeys; this.path = partition.path; } @@ -50,46 +73,44 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro if(proto.hasPartitionName()) { this.partitionName = proto.getPartitionName(); } - this.ordinalPosition = proto.getOrdinalPosition(); - if(proto.hasPartitionValue()) { - this.partitionValue = proto.getPartitionValue(); + + this.partitionKeys = new ArrayList<PartitionKey>(); + for(CatalogProtos.PartitionKeyProto keyProto : proto.getPartitionKeysList()) { + PartitionKey partitionKey = new PartitionKey(keyProto); + this.partitionKeys.add(partitionKey); } + if(proto.hasPath()) { this.path = proto.getPath(); } } - public void setName(String partitionName) { - this.partitionName = partitionName; - } - public String getName() { + public String getPartitionName() { return partitionName; } - - public void setOrdinalPosition(int ordinalPosition) { - this.ordinalPosition = ordinalPosition; - } - public int getOrdinalPosition() { - return ordinalPosition; + public void setPartitionName(String partitionName) { + this.partitionName = partitionName; } - public void setPartitionValue(String partitionValue) { - this.partitionValue = partitionValue; + public List<PartitionKey> getPartitionKeys() { + return partitionKeys; } - public String getPartitionValue() { - return partitionValue; + + public void setPartitionKeys(List<PartitionKey> partitionKeys) { + this.partitionKeys = partitionKeys; } public void setPath(String path) { this.path = path; } + public String getPath() { return path; } public int hashCode() { - return Objects.hashCode(partitionName, ordinalPosition, partitionValue, path); + return Objects.hashCode(partitionName, partitionKeys, path); } public boolean equals(Object o) { @@ -98,10 +119,9 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro boolean eq = ((partitionName != null && another.partitionName != null && partitionName.equals(another.partitionName)) || (partitionName == null && another.partitionName == null)); - eq = eq && (ordinalPosition == another.ordinalPosition); - eq = eq && ((partitionValue != null && another.partitionValue != null - && partitionValue.equals(another.partitionValue)) - || (partitionValue == null && another.partitionValue == null)); + eq = eq && ((partitionKeys != null && another.partitionKeys != null + && partitionKeys.equals(another.partitionKeys)) + || (partitionKeys == null && another.partitionKeys == null)); eq = eq && ((path != null && another.path != null && path.equals(another.path)) || (path == null && another.path == null)); return eq; @@ -117,13 +137,14 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro } if(this.partitionName != null) { - builder.setPartitionName(partitionName); + builder.setPartitionName(this.partitionName); } - builder.setOrdinalPosition(this.ordinalPosition); - - if (this.partitionValue != null) { - builder.setPartitionValue(this.partitionValue); + builder.clearPartitionKeys(); + if (this.partitionKeys != null) { + for(PartitionKey partitionKey : this.partitionKeys) { + builder.addPartitionKeys(partitionKey.getProto()); + } } if(this.path != null) { @@ -134,8 +155,9 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro } public String toString() { - StringBuilder sb = new StringBuilder("name: " + partitionName); - return sb.toString(); + Gson gson = new GsonBuilder().setPrettyPrinting(). + excludeFieldsWithoutExposeAnnotation().create(); + return gson.toJson(this); } @Override @@ -151,8 +173,7 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro PartitionDesc desc = (PartitionDesc) super.clone(); desc.builder = CatalogProtos.PartitionDescProto.newBuilder(); desc.partitionName = partitionName; - desc.ordinalPosition = ordinalPosition; - desc.partitionValue = partitionValue; + desc.partitionKeys = partitionKeys; desc.path = path; return desc; http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java new file mode 100644 index 0000000..085598b --- /dev/null +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.partition; + +import com.google.common.base.Objects; +import com.google.gson.annotations.Expose; +import org.apache.tajo.catalog.json.CatalogGsonHelper; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.common.ProtoObject; +import org.apache.tajo.json.GsonObject; +import org.apache.tajo.util.TUtil; + + +/** + * This presents column name and partition value pairs of column partitioned table. + * + * For example, consider you have a partitioned table as follows: + * + * create external table table1 (id text, name text) PARTITION BY COLUMN (dt text, phone text, + * gender text) USING RCFILE LOCATION '/tajo/data/table1'; + * + * Then, its data will be stored on HDFS as follows: + * - /tajo/data/table1/dt=20150301/phone=1300/gender=m + * - /tajo/data/table1/dt=20150301/phone=1300/gender=f + * - /tajo/data/table1/dt=20150302/phone=1500/gender=m + * - /tajo/data/table1/dt=20150302/phone=1500/gender=f + * + * In such as above, first directory can be presented with this class as follows: + * The first pair: column name = dt, partition value = 20150301 + * The second pair: column name = phone, partition value = 1300 + * The thris pair: column name = gender, partition value = m + * + */ +public class PartitionKey implements ProtoObject<CatalogProtos.PartitionKeyProto>, Cloneable, GsonObject { + @Expose protected String columnName; // required + @Expose protected String partitionValue; // required + + private CatalogProtos.PartitionKeyProto.Builder builder = CatalogProtos.PartitionKeyProto.newBuilder(); + + public PartitionKey() { + } + + public PartitionKey(String columnName, String partitionValue) { + this.columnName = columnName; + this.partitionValue = partitionValue; + } + + public PartitionKey(PartitionKey partition) { + this.columnName = partition.columnName; + this.partitionValue = partition.partitionValue; + } + + public PartitionKey(CatalogProtos.PartitionKeyProto proto) { + if (proto.hasColumnName()) { + this.columnName = proto.getColumnName(); + } + if (proto.hasPartitionValue()) { + this.partitionValue = proto.getPartitionValue(); + } + } + + public String getPartitionValue() { + return partitionValue; + } + + public void setPartitionValue(String partitionValue) { + this.partitionValue = partitionValue; + } + + public String getColumnName() { + return columnName; + } + + public void setColumnName(String columnName) { + this.columnName = columnName; + } + + public int hashCode() { + return Objects.hashCode(partitionValue, columnName); + } + + public boolean equals(Object o) { + if (o instanceof PartitionKey) { + PartitionKey another = (PartitionKey) o; + return TUtil.checkEquals(columnName, another.columnName) && + TUtil.checkEquals(partitionValue, another.partitionValue); + } + return false; + } + + @Override + public CatalogProtos.PartitionKeyProto getProto() { + if (builder == null) { + builder = CatalogProtos.PartitionKeyProto.newBuilder(); + } + + if (this.columnName != null) { + builder.setColumnName(this.columnName); + } + + if (this.partitionValue != null) { + builder.setPartitionValue(this.partitionValue); + } + + return builder.build(); + } + + public String toString() { + StringBuilder sb = new StringBuilder("name: " + partitionValue); + return sb.toString(); + } + + @Override + public String toJson() { + return CatalogGsonHelper.toJson(this, PartitionKey.class); + } + + public static PartitionKey fromJson(String strVal) { + return strVal != null ? CatalogGsonHelper.fromJson(strVal, PartitionKey.class) : null; + } + + public Object clone() throws CloneNotSupportedException { + PartitionKey desc = (PartitionKey) super.clone(); + desc.builder = CatalogProtos.PartitionKeyProto.newBuilder(); + desc.partitionValue = partitionValue; + desc.columnName = columnName; + + return desc; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index a204685..3abd840 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -58,6 +58,8 @@ enum AlterTableType { RENAME_TABLE = 0; RENAME_COLUMN = 1; ADD_COLUMN = 2; + ADD_PARTITION = 3; + DROP_PARTITION = 4; } message ColumnProto { @@ -184,11 +186,10 @@ message TableOptionProto { } message TablePartitionProto { - required int32 pid = 1; + required int32 partition_id = 1; required int32 tid = 2; optional string partitionName = 3; - required int32 ordinalPosition = 4; - optional string path = 5; + optional string path = 4; } message GetIndexByColumnRequest { @@ -281,8 +282,7 @@ message SortSpecProto { message PartitionsProto { - required TableIdentifierProto tableIdentifier = 1; - repeated PartitionDescProto partition = 2; + repeated PartitionDescProto partition = 1; } message PartitionMethodProto { @@ -293,10 +293,21 @@ message PartitionMethodProto { } message PartitionDescProto { - optional string partitionName = 2; - required int32 ordinalPosition = 3; - optional string partitionValue = 4; - optional string path = 5; + required string partitionName = 1; + repeated PartitionKeyProto partitionKeys = 2; + optional string path = 3; +} + +message PartitionKeyProto { + required string columnName = 1; + required string partitionValue = 2; +} + + +message PartitionIdentifierProto { + required string databaseName = 1; + required string tableName = 2; + optional string partitionName = 3; } message TablespaceProto { @@ -345,6 +356,7 @@ message AlterTableDescProto { optional ColumnProto addColumn = 3; optional AlterColumnProto alterColumnName = 4; required AlterTableType alterTableType = 5; + optional PartitionDescProto partitionDesc = 6; } message AlterColumnProto { http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java index 2c3fc6a..2761517 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java @@ -608,7 +608,8 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore { final String databaseName = split[0]; final String tableName = split[1]; - + String partitionName = null; + CatalogProtos.PartitionDescProto partitionDesc = null; switch (alterTableDescProto.getAlterTableType()) { case RENAME_TABLE: @@ -629,6 +630,22 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore { } addNewColumn(databaseName, tableName, alterTableDescProto.getAddColumn()); break; + case ADD_PARTITION: + partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); + partitionDesc = getPartition(databaseName, tableName, partitionName); + if(partitionDesc != null) { + throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName); + } + addPartition(databaseName, tableName, alterTableDescProto.getPartitionDesc()); + break; + case DROP_PARTITION: + partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); + partitionDesc = getPartition(databaseName, tableName, partitionName); + if(partitionDesc == null) { + throw new NoSuchPartitionException(databaseName, tableName, partitionName); + } + dropPartition(databaseName, tableName, partitionDesc); + break; default: //TODO } @@ -701,6 +718,59 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore { } } + private void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto + partitionDescProto) { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + + Partition partition = new Partition(); + partition.setDbName(databaseName); + partition.setTableName(tableName); + + List<String> values = Lists.newArrayList(); + for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) { + values.add(keyProto.getPartitionValue()); + } + partition.setValues(values); + + Table table = client.getHiveClient().getTable(databaseName, tableName); + StorageDescriptor sd = table.getSd(); + sd.setLocation(partitionDescProto.getPath()); + partition.setSd(sd); + + client.getHiveClient().add_partition(partition); + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + private void dropPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto + partitionDescProto) { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + + List<String> values = Lists.newArrayList(); + for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) { + values.add(keyProto.getPartitionValue()); + } + client.getHiveClient().dropPartition(databaseName, tableName, values, true); + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + @Override public void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws CatalogException { // TODO - not implemented yet @@ -723,35 +793,48 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore { } @Override - public void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws CatalogException { - // TODO - not implemented yet + public List<CatalogProtos.PartitionDescProto> getPartitions(String databaseName, + String tableName) throws CatalogException { + throw new UnsupportedOperationException(); } - @Override - public void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto partitionDescProto) throws CatalogException { - - } @Override - public CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException { - return null; // TODO - not implemented yet - } + public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, + String partitionName) throws CatalogException { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + CatalogProtos.PartitionDescProto.Builder builder = null; - @Override - public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException { - return null; // TODO - not implemented yet - } + try { + client = clientPool.getClient(); - @Override - public void delPartition(String partitionName) throws CatalogException { - // TODO - not implemented yet - } + Partition partition = client.getHiveClient().getPartition(databaseName, tableName, partitionName); + builder = CatalogProtos.PartitionDescProto.newBuilder(); + builder.setPartitionName(partitionName); + builder.setPath(partition.getSd().getLocation()); - @Override - public void dropPartitions(String tableName) throws CatalogException { + String[] partitionNames = partitionName.split("/"); - } + for (int i = 0; i < partition.getValues().size(); i++) { + String value = partition.getValues().get(i); + CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder(); + String columnName = partitionNames[i].split("=")[0]; + keyBuilder.setColumnName(columnName); + keyBuilder.setPartitionValue(value); + builder.addPartitionKeys(keyBuilder); + } + } catch (NoSuchObjectException e) { + return null; + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + return builder.build(); + } @Override public final void addFunction(final FunctionDesc func) throws CatalogException { http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java index 725f665..32ab674 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java @@ -24,10 +24,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.partition.PartitionDesc; +import org.apache.tajo.catalog.partition.PartitionKey; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.TajoDataTypes; @@ -40,8 +39,11 @@ import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; import java.util.List; +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.junit.Assert.*; /** @@ -232,11 +234,12 @@ public class TestHCatalogStore { org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema(); expressionSchema.addColumn("n_nationkey", TajoDataTypes.Type.INT4); + expressionSchema.addColumn("n_date", TajoDataTypes.Type.TEXT); PartitionMethodDesc partitions = new PartitionMethodDesc( DB_NAME, NATION, - CatalogProtos.PartitionType.COLUMN, expressionSchema.getColumn(0).getQualifiedName(), expressionSchema); + CatalogProtos.PartitionType.COLUMN, "n_nationkey,n_date", expressionSchema); table.setPartitionMethod(partitions); store.createTable(table.getProto()); @@ -250,18 +253,80 @@ public class TestHCatalogStore { assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); } - Schema partitionSchema = table.getPartitionMethod().getExpressionSchema(); Schema partitionSchema1 = table1.getPartitionMethod().getExpressionSchema(); assertEquals(partitionSchema.size(), partitionSchema1.size()); + for (int i = 0; i < partitionSchema.size(); i++) { assertEquals(partitionSchema.getColumn(i).getSimpleName(), partitionSchema1.getColumn(i).getSimpleName()); } + testAddPartition(table1.getPath(), NATION, "n_nationkey=10/n_date=20150101"); + testAddPartition(table1.getPath(), NATION, "n_nationkey=20/n_date=20150102"); + + testDropPartition(NATION, "n_nationkey=10/n_date=20150101"); + testDropPartition(NATION, "n_nationkey=20/n_date=20150102"); + + CatalogProtos.PartitionDescProto partition = store.getPartition(DB_NAME, NATION, "n_nationkey=10/n_date=20150101"); + assertNull(partition); + + partition = store.getPartition(DB_NAME, NATION, "n_nationkey=20/n_date=20150102"); + assertNull(partition); + store.dropTable(DB_NAME, NATION); } + private void testAddPartition(URI uri, String tableName, String partitionName) throws Exception { + AlterTableDesc alterTableDesc = new AlterTableDesc(); + alterTableDesc.setTableName(DB_NAME + "." + tableName); + alterTableDesc.setAlterTableType(AlterTableType.ADD_PARTITION); + + Path path = new Path(uri.getPath(), partitionName); + + PartitionDesc partitionDesc = new PartitionDesc(); + partitionDesc.setPartitionName(partitionName); + + List<PartitionKey> partitionKeyList = new ArrayList<PartitionKey>(); + String[] partitionNames = partitionName.split("/"); + for(int i = 0; i < partitionNames.length; i++) { + String[] eachPartitionName = partitionNames[i].split("="); + partitionKeyList.add(new PartitionKey(eachPartitionName[0], eachPartitionName[1])); + } + partitionDesc.setPartitionKeys(partitionKeyList); + partitionDesc.setPath(path.toString()); + + alterTableDesc.setPartitionDesc(partitionDesc); + + store.alterTable(alterTableDesc.getProto()); + + CatalogProtos.PartitionDescProto resultDesc = store.getPartition(DB_NAME, NATION, partitionName); + assertNotNull(resultDesc); + assertEquals(resultDesc.getPartitionName(), partitionName); + assertEquals(resultDesc.getPath(), uri.toString() + "/" + partitionName); + assertEquals(resultDesc.getPartitionKeysCount(), 2); + + for (int i = 0; i < resultDesc.getPartitionKeysCount(); i++) { + CatalogProtos.PartitionKeyProto keyProto = resultDesc.getPartitionKeys(i); + String[] eachName = partitionNames[i].split("="); + assertEquals(keyProto.getPartitionValue(), eachName[1]); + } + } + + + private void testDropPartition(String tableName, String partitionName) throws Exception { + AlterTableDesc alterTableDesc = new AlterTableDesc(); + alterTableDesc.setTableName(DB_NAME + "." + tableName); + alterTableDesc.setAlterTableType(AlterTableType.DROP_PARTITION); + + PartitionDesc partitionDesc = new PartitionDesc(); + partitionDesc.setPartitionName(partitionName); + + alterTableDesc.setPartitionDesc(partitionDesc); + + store.alterTable(alterTableDesc.getProto()); + } + @Test public void testGetAllTableNames() throws Exception{ TableMeta meta = new TableMeta(CatalogProtos.StoreType.CSV, new KeyValueSet()); http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java index c34b4d2..e9fb177 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java @@ -33,6 +33,7 @@ import org.apache.tajo.annotation.ThreadSafe; import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService; import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary; import org.apache.tajo.catalog.exception.*; +import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.catalog.store.CatalogStore; import org.apache.tajo.catalog.store.DerbyStore; @@ -69,7 +70,6 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; */ @ThreadSafe public class CatalogServer extends AbstractService { - private final static String DEFAULT_NAMESPACE = "public"; private final static Log LOG = LogFactory.getLog(CatalogServer.class); @@ -821,34 +821,90 @@ public class CatalogServer extends AbstractService { } @Override - public BoolProto addPartitions(RpcController controller, PartitionsProto request) throws ServiceException { - return ProtoUtil.TRUE; - } + public PartitionDescProto getPartitionByPartitionName(RpcController controller, PartitionIdentifierProto request) + throws ServiceException { + String databaseName = request.getDatabaseName(); + String tableName = request.getTableName(); + String partitionName = request.getPartitionName(); - @Override - public BoolProto addPartition(RpcController controller, PartitionDescProto request) throws ServiceException { - return ProtoUtil.TRUE; - } + if (metaDictionary.isSystemDatabase(databaseName)) { + throw new ServiceException(databaseName + " is a system databsae. It does not contain any partitioned tables."); + } - @Override - public PartitionDescProto getPartitionByPartitionName(RpcController controller, StringProto request) - throws ServiceException { - return null; - } + rlock.lock(); + try { + boolean contain; - @Override - public PartitionsProto getPartitionsByTableName(RpcController controller, - StringProto request) - throws ServiceException { - return null; + contain = store.existDatabase(databaseName); + if (contain) { + contain = store.existTable(databaseName, tableName); + if (contain) { + if (store.existPartitionMethod(databaseName, tableName)) { + PartitionDescProto partitionDesc = store.getPartition(databaseName, tableName, partitionName); + if (partitionDesc != null) { + return partitionDesc; + } else { + throw new NoSuchPartitionException(databaseName, tableName, partitionName); + } + } else { + throw new NoPartitionedTableException(databaseName, tableName); + } + } else { + throw new NoSuchTableException(tableName); + } + } else { + throw new NoSuchDatabaseException(databaseName); + } + } catch (Exception e) { + LOG.error(e); + throw new ServiceException(e); + } finally { + rlock.unlock(); + } } @Override - public PartitionsProto delAllPartitions(RpcController controller, StringProto request) - throws ServiceException { - return null; + public PartitionsProto getPartitionsByTableName(RpcController controller, PartitionIdentifierProto request) + throws ServiceException { + String databaseName = request.getDatabaseName(); + String tableName = request.getTableName(); + + if (metaDictionary.isSystemDatabase(databaseName)) { + throw new ServiceException(databaseName + " is a system databsae. It does not contain any partitioned tables."); + } + + rlock.lock(); + try { + boolean contain; + + contain = store.existDatabase(databaseName); + if (contain) { + contain = store.existTable(databaseName, tableName); + if (contain) { + if (store.existPartitionMethod(databaseName, tableName)) { + List<PartitionDescProto> partitions = store.getPartitions(databaseName, tableName); + PartitionsProto.Builder builder = PartitionsProto.newBuilder(); + for(PartitionDescProto partition : partitions) { + builder.addPartition(partition); + } + return builder.build(); + } else { + throw new NoPartitionedTableException(databaseName, tableName); + } + } else { + throw new NoSuchTableException(tableName); + } + } else { + throw new NoSuchDatabaseException(databaseName); + } + } catch (Exception e) { + LOG.error(e); + throw new ServiceException(e); + } finally { + rlock.unlock(); + } } - + @Override public GetTablePartitionsProto getAllPartitions(RpcController controller, NullProto request) throws ServiceException { rlock.lock(); http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java index 0ac0a54..1bb8bc5 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java @@ -40,6 +40,7 @@ public class InfoSchemaMetadataDictionary { TABLEOPTIONS, TABLESTATS, PARTITIONS, + PARTITION_KEYS, CLUSTER, MAX_TABLE; } @@ -60,6 +61,7 @@ public class InfoSchemaMetadataDictionary { schemaInfoTableDescriptors.set(DEFINED_TABLES.TABLEOPTIONS.ordinal(), new TableOptionsTableDescriptor(this)); schemaInfoTableDescriptors.set(DEFINED_TABLES.TABLESTATS.ordinal(), new TableStatsTableDescriptor(this)); schemaInfoTableDescriptors.set(DEFINED_TABLES.PARTITIONS.ordinal(), new PartitionsTableDescriptor(this)); + schemaInfoTableDescriptors.set(DEFINED_TABLES.PARTITION_KEYS.ordinal(), new PartitionKeysTableDescriptor(this)); schemaInfoTableDescriptors.set(DEFINED_TABLES.CLUSTER.ordinal(), new ClusterTableDescriptor(this)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionKeysTableDescriptor.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionKeysTableDescriptor.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionKeysTableDescriptor.java new file mode 100644 index 0000000..ea35cef --- /dev/null +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionKeysTableDescriptor.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.dictionary; + +import org.apache.tajo.common.TajoDataTypes.Type; + +class PartitionKeysTableDescriptor extends AbstractTableDescriptor { + + private static final String TABLENAME = "partition_keys"; + private final ColumnDescriptor[] columns = new ColumnDescriptor[] { + new ColumnDescriptor("partition_id", Type.INT4, 0), + new ColumnDescriptor("column_name", Type.TEXT, 0), + new ColumnDescriptor("partition_value", Type.TEXT, 0), + }; + + public PartitionKeysTableDescriptor(InfoSchemaMetadataDictionary metadataDictionary) { + super(metadataDictionary); + } + + @Override + public String getTableNameString() { + return TABLENAME; + } + + @Override + protected ColumnDescriptor[] getColumnDescriptors() { + return columns; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java index d69c93e..a6725c0 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java @@ -24,10 +24,9 @@ class PartitionsTableDescriptor extends AbstractTableDescriptor { private static final String TABLENAME = "partitions"; private final ColumnDescriptor[] columns = new ColumnDescriptor[] { - new ColumnDescriptor("pid", Type.INT4, 0), + new ColumnDescriptor("partition_id", Type.INT4, 0), new ColumnDescriptor("tid", Type.INT4, 0), new ColumnDescriptor("partition_name", Type.TEXT, 0), - new ColumnDescriptor("ordinal_position", Type.INT4, 0), new ColumnDescriptor("path", Type.TEXT, 0) }; http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java index be6bf1c..518b499 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java @@ -973,7 +973,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo } String databaseName = splitted[0]; String tableName = splitted[1]; - + String partitionName = null; + CatalogProtos.PartitionDescProto partitionDesc = null; try { int databaseId = getDatabaseId(databaseName); @@ -998,6 +999,22 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo } addNewColumn(tableId, alterTableDescProto.getAddColumn()); break; + case ADD_PARTITION: + partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); + partitionDesc = getPartition(databaseName, tableName, partitionName); + if(partitionDesc != null) { + throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName); + } + addPartition(tableId, alterTableDescProto.getPartitionDesc()); + break; + case DROP_PARTITION: + partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); + partitionDesc = getPartition(databaseName, tableName, partitionName); + if(partitionDesc == null) { + throw new NoSuchPartitionException(databaseName, tableName, partitionName); + } + dropPartition(tableId, alterTableDescProto.getPartitionDesc().getPartitionName()); + break; default: } } catch (SQLException sqlException) { @@ -1158,6 +1175,120 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo } } + public void addPartition(int tableId, CatalogProtos.PartitionDescProto partition) throws CatalogException { + Connection conn = null; + PreparedStatement pstmt = null; + final String ADD_PARTITION_SQL = + "INSERT INTO " + TB_PARTTIONS + + " (" + COL_TABLES_PK + ", PARTITION_NAME, PATH) VALUES (?,?,?)"; + + final String ADD_PARTITION_KEYS_SQL = + "INSERT INTO " + TB_PARTTION_KEYS + " (" + COL_PARTITIONS_PK + ", " + COL_COLUMN_NAME + ", " + + COL_PARTITION_VALUE + ") VALUES (?,?,?)"; + + try { + + if (LOG.isDebugEnabled()) { + LOG.debug(ADD_PARTITION_SQL); + } + + conn = getConnection(); + pstmt = conn.prepareStatement(ADD_PARTITION_SQL); + + pstmt.setInt(1, tableId); + pstmt.setString(2, partition.getPartitionName()); + pstmt.setString(3, partition.getPath()); + pstmt.executeUpdate(); + + if (partition.getPartitionKeysCount() > 0) { + pstmt = conn.prepareStatement(ADD_PARTITION_KEYS_SQL); + int partitionId = getPartitionId(tableId, partition.getPartitionName()); + addPartitionKeys(pstmt, partitionId, partition); + pstmt.executeBatch(); + } + } catch (SQLException se) { + throw new CatalogException(se); + } finally { + CatalogUtil.closeQuietly(pstmt); + } + } + + public int getPartitionId(int tableId, String partitionName) throws CatalogException { + Connection conn = null; + ResultSet res = null; + PreparedStatement pstmt = null; + int retValue = -1; + + try { + String sql = "SELECT " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS + + " WHERE " + COL_TABLES_PK + " = ? AND PARTITION_NAME = ? "; + + if (LOG.isDebugEnabled()) { + LOG.debug(sql); + } + + conn = getConnection(); + pstmt = conn.prepareStatement(sql); + pstmt.setInt(1, tableId); + pstmt.setString(2, partitionName); + res = pstmt.executeQuery(); + + if (res.next()) { + retValue = res.getInt(1); + } + } catch (SQLException se) { + throw new CatalogException(se); + } finally { + CatalogUtil.closeQuietly(pstmt, res); + } + return retValue; + } + + private void addPartitionKeys(PreparedStatement pstmt, int partitionId, PartitionDescProto partition) throws + SQLException { + for (int i = 0; i < partition.getPartitionKeysCount(); i++) { + PartitionKeyProto partitionKey = partition.getPartitionKeys(i); + + pstmt.setInt(1, partitionId); + pstmt.setString(2, partitionKey.getColumnName()); + pstmt.setString(3, partitionKey.getPartitionValue()); + + pstmt.addBatch(); + pstmt.clearParameters(); + } + } + + + private void dropPartition(int tableId, String partitionName) throws CatalogException { + Connection conn = null; + PreparedStatement pstmt = null; + + try { + int partitionId = getPartitionId(tableId, partitionName); + + String sqlDeletePartitionKeys = "DELETE FROM " + TB_PARTTION_KEYS + " WHERE " + COL_PARTITIONS_PK + " = ? "; + String sqlDeletePartition = "DELETE FROM " + TB_PARTTIONS + " WHERE " + COL_PARTITIONS_PK + " = ? "; + + if (LOG.isDebugEnabled()) { + LOG.debug(sqlDeletePartitionKeys); + } + + conn = getConnection(); + pstmt = conn.prepareStatement(sqlDeletePartitionKeys); + pstmt.setInt(1, partitionId); + pstmt.executeUpdate(); + + pstmt = conn.prepareStatement(sqlDeletePartition); + pstmt.setInt(1, partitionId); + pstmt.executeUpdate(); + + } catch (SQLException se) { + throw new CatalogException(se); + } finally { + CatalogUtil.closeQuietly(pstmt); + } + } + private int getDatabaseId(String databaseName) throws SQLException { String sql = String.format("SELECT DB_ID from %s WHERE DB_NAME = ?", TB_DATABASES); @@ -1260,6 +1391,19 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo pstmt.executeUpdate(); pstmt.close(); + sql = "DELETE FROM " + TB_PARTTION_KEYS + + " WHERE " + COL_PARTITIONS_PK + + " IN (SELECT " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS + " WHERE " + COL_TABLES_PK + "= ? )"; + + if (LOG.isDebugEnabled()) { + LOG.debug(sql); + } + + pstmt = conn.prepareStatement(sql); + pstmt.setInt(1, tableId); + pstmt.executeUpdate(); + pstmt.close(); + sql = "DELETE FROM " + TB_PARTTIONS + " WHERE " + COL_TABLES_PK + " = ? "; if (LOG.isDebugEnabled()) { @@ -1698,66 +1842,14 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo return columns; } - private static final String ADD_PARTITION_SQL = - "INSERT INTO " + TB_PARTTIONS + " (TID, PARTITION_NAME, ORDINAL_POSITION, PATH) VALUES (?,?,?,?)"; - - - @Override - public void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws CatalogException { - Connection conn = null; - PreparedStatement pstmt = null; - - try { - if (LOG.isDebugEnabled()) { - LOG.debug(ADD_PARTITION_SQL); - } - - String databaseName = partitionsProto.getTableIdentifier().getDatabaseName(); - String tableName = partitionsProto.getTableIdentifier().getTableName(); - - int databaseId = getDatabaseId(databaseName); - int tableId = getTableId(databaseId, databaseName, tableName); - - conn = getConnection(); - pstmt = conn.prepareStatement(ADD_PARTITION_SQL); - - for (CatalogProtos.PartitionDescProto partition : partitionsProto.getPartitionList()) { - addPartitionInternal(pstmt, tableId, partition); - } - pstmt.executeBatch(); - conn.commit(); - } catch (SQLException se) { - if (conn != null) { - try { - conn.rollback(); - } catch (SQLException e) { - LOG.error(e, e); - } - } - throw new CatalogException(se); - } finally { - CatalogUtil.closeQuietly(pstmt); - } - } - - private static void addPartitionInternal(PreparedStatement pstmt, int tableId, PartitionDescProto partition) throws - SQLException { - pstmt.setInt(1, tableId); - pstmt.setString(2, partition.getPartitionName()); - pstmt.setInt(3, partition.getOrdinalPosition()); - pstmt.setString(4, partition.getPath()); - pstmt.addBatch(); - pstmt.clearParameters(); - } - @Override public void addPartitionMethod(CatalogProtos.PartitionMethodProto proto) throws CatalogException { Connection conn = null; PreparedStatement pstmt = null; try { - String sql = "INSERT INTO " + TB_PARTITION_METHODS + " (TID, PARTITION_TYPE, EXPRESSION, EXPRESSION_SCHEMA) " + - "VALUES (?,?,?,?)"; + String sql = "INSERT INTO " + TB_PARTITION_METHODS + + " (" + COL_TABLES_PK + ", PARTITION_TYPE, EXPRESSION, EXPRESSION_SCHEMA) VALUES (?,?,?,?)"; if (LOG.isDebugEnabled()) { LOG.debug(sql); @@ -1789,15 +1881,18 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo PreparedStatement pstmt = null; try { - String sql = "DELETE FROM " + TB_PARTITION_METHODS + " WHERE " + COL_TABLES_NAME + " = ? "; + String sql = "DELETE FROM " + TB_PARTITION_METHODS + " WHERE " + COL_TABLES_PK + " = ? "; if (LOG.isDebugEnabled()) { LOG.debug(sql); } + int databaseId = getDatabaseId(databaseName); + int tableId = getTableId(databaseId, databaseName, tableName); + conn = getConnection(); pstmt = conn.prepareStatement(sql); - pstmt.setString(1, tableName); + pstmt.setInt(1, tableId); pstmt.executeUpdate(); } catch (SQLException se) { throw new CatalogException(se); @@ -1815,15 +1910,18 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo try { String sql = "SELECT partition_type, expression, expression_schema FROM " + TB_PARTITION_METHODS + - " WHERE " + COL_TABLES_NAME + " = ? "; + " WHERE " + COL_TABLES_PK + " = ? "; if (LOG.isDebugEnabled()) { LOG.debug(sql); } + int databaseId = getDatabaseId(databaseName); + int tableId = getTableId(databaseId, databaseName, tableName); + conn = getConnection(); pstmt = conn.prepareStatement(sql); - pstmt.setString(1, tableName); + pstmt.setInt(1, tableId); res = pstmt.executeQuery(); if (res.next()) { @@ -1848,15 +1946,18 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo try { String sql = "SELECT partition_type, expression, expression_schema FROM " + TB_PARTITION_METHODS + - " WHERE " + COL_TABLES_NAME + "= ?"; + " WHERE " + COL_TABLES_PK + "= ?"; if (LOG.isDebugEnabled()) { LOG.debug(sql); } + int databaseId = getDatabaseId(databaseName); + int tableId = getTableId(databaseId, databaseName, tableName); + conn = getConnection(); pstmt = conn.prepareStatement(sql); - pstmt.setString(1, tableName); + pstmt.setInt(1, tableId); res = pstmt.executeQuery(); exist = res.next(); @@ -1869,90 +1970,113 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo } @Override - public void addPartition(String databaseName, String tableName, - CatalogProtos.PartitionDescProto partition) throws CatalogException { + public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, + String partitionName) throws CatalogException { Connection conn = null; + ResultSet res = null; PreparedStatement pstmt = null; + PartitionDescProto.Builder builder = null; try { + String sql = "SELECT PATH, " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS + + " WHERE " + COL_TABLES_PK + " = ? AND PARTITION_NAME = ? "; + if (LOG.isDebugEnabled()) { - LOG.debug(ADD_PARTITION_SQL); + LOG.debug(sql); } int databaseId = getDatabaseId(databaseName); int tableId = getTableId(databaseId, databaseName, tableName); conn = getConnection(); - pstmt = conn.prepareStatement(ADD_PARTITION_SQL); - addPartitionInternal(pstmt, tableId, partition); - pstmt.executeUpdate(); + pstmt = conn.prepareStatement(sql); + pstmt.setInt(1, tableId); + pstmt.setString(2, partitionName); + res = pstmt.executeQuery(); + + if (res.next()) { + builder = PartitionDescProto.newBuilder(); + builder.setPath(res.getString("PATH")); + builder.setPartitionName(partitionName); + setPartitionKeys(res.getInt(COL_PARTITIONS_PK), builder); + } else { + return null; + } } catch (SQLException se) { throw new CatalogException(se); } finally { - CatalogUtil.closeQuietly(pstmt); + CatalogUtil.closeQuietly(pstmt, res); } + return builder.build(); } - @Override - public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException { - // TODO - throw new UnimplementedException("getPartition is not implemented"); - } - - - @Override - public CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException { - // TODO - throw new UnimplementedException("getPartitions is not implemented"); - } - - - @Override - public void delPartition(String partitionName) throws CatalogException { + private void setPartitionKeys(int pid, PartitionDescProto.Builder partitionDesc) throws + CatalogException { Connection conn = null; + ResultSet res = null; PreparedStatement pstmt = null; try { - String sql = "DELETE FROM " + TB_PARTTIONS + " WHERE PARTITION_NAME = ? "; - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } + String sql = "SELECT "+ COL_COLUMN_NAME + " , "+ COL_PARTITION_VALUE + + " FROM " + TB_PARTTION_KEYS + " WHERE " + COL_PARTITIONS_PK + " = ? "; conn = getConnection(); pstmt = conn.prepareStatement(sql); - pstmt.setString(1, partitionName); - pstmt.executeUpdate(); + pstmt.setInt(1, pid); + res = pstmt.executeQuery(); + + while (res.next()) { + PartitionKeyProto.Builder builder = PartitionKeyProto.newBuilder(); + builder.setColumnName(res.getString(COL_COLUMN_NAME)); + builder.setPartitionValue(res.getString(COL_PARTITION_VALUE)); + partitionDesc.addPartitionKeys(builder); + } } catch (SQLException se) { throw new CatalogException(se); } finally { - CatalogUtil.closeQuietly(pstmt); + CatalogUtil.closeQuietly(pstmt, res); } } @Override - public void dropPartitions(String tableName) throws CatalogException { + public List<PartitionDescProto> getPartitions(String databaseName, String tableName) throws CatalogException { Connection conn = null; + ResultSet res = null; PreparedStatement pstmt = null; + PartitionDescProto.Builder builder = null; + List<PartitionDescProto> partitions = new ArrayList<PartitionDescProto>(); try { - String sql = "DELETE FROM " + TB_PARTTIONS + " WHERE " + COL_TABLES_NAME + "= ? "; + String sql = "SELECT PATH, PARTITION_NAME, " + COL_PARTITIONS_PK + " FROM " + + TB_PARTTIONS +" WHERE " + COL_TABLES_PK + " = ? "; if (LOG.isDebugEnabled()) { LOG.debug(sql); } + int databaseId = getDatabaseId(databaseName); + int tableId = getTableId(databaseId, databaseName, tableName); + conn = getConnection(); pstmt = conn.prepareStatement(sql); - pstmt.setString(1, tableName); - pstmt.executeUpdate(); + pstmt.setInt(1, tableId); + res = pstmt.executeQuery(); + + while (res.next()) { + builder = PartitionDescProto.newBuilder(); + builder.setPath(res.getString("PATH")); + builder.setPartitionName(res.getString("PARTITION_NAME")); + setPartitionKeys(res.getInt(COL_PARTITIONS_PK), builder); + partitions.add(builder.build()); + } } catch (SQLException se) { throw new CatalogException(se); } finally { - CatalogUtil.closeQuietly(pstmt); + CatalogUtil.closeQuietly(pstmt, res); } + return partitions; } - + @Override public List<TablePartitionProto> getAllPartitions() throws CatalogException { Connection conn = null; @@ -1962,20 +2086,20 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo List<TablePartitionProto> partitions = new ArrayList<TablePartitionProto>(); try { - String sql = "SELECT PID, TID, PARTITION_NAME, ORDINAL_POSITION, PATH FROM " + TB_PARTTIONS; + String sql = "SELECT " + COL_PARTITIONS_PK + ", " + COL_TABLES_PK + ", PARTITION_NAME, " + + " PATH FROM " + TB_PARTTIONS; conn = getConnection(); stmt = conn.createStatement(); resultSet = stmt.executeQuery(sql); while (resultSet.next()) { TablePartitionProto.Builder builder = TablePartitionProto.newBuilder(); - - builder.setPid(resultSet.getInt("PID")); - builder.setTid(resultSet.getInt("TID")); + + builder.setPartitionId(resultSet.getInt(COL_PARTITIONS_PK)); + builder.setTid(resultSet.getInt(COL_TABLES_PK)); builder.setPartitionName(resultSet.getString("PARTITION_NAME")); - builder.setOrdinalPosition(resultSet.getInt("ORDINAL_POSITION")); builder.setPath(resultSet.getString("PATH")); - + partitions.add(builder.build()); } } catch (SQLException se) { @@ -1983,7 +2107,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo } finally { CatalogUtil.closeQuietly(stmt, resultSet); } - + return partitions; } @@ -2003,7 +2127,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo String sql = "INSERT INTO " + TB_INDEXES + " (" + COL_DATABASES_PK + ", " + COL_TABLES_PK + ", INDEX_NAME, " + - "COLUMN_NAME, DATA_TYPE, INDEX_TYPE, IS_UNIQUE, IS_CLUSTERED, IS_ASCENDING) VALUES (?,?,?,?,?,?,?,?,?)"; + "" + COL_COLUMN_NAME + ", DATA_TYPE, INDEX_TYPE, IS_UNIQUE, IS_CLUSTERED, IS_ASCENDING) " + + "VALUES (?,?,?,?,?,?,?,?,?)"; if (LOG.isDebugEnabled()) { LOG.debug(sql); http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java index 6d0876f..be9727e 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java @@ -204,6 +204,19 @@ public abstract class AbstractMySQLMariaDBStore extends AbstractDBStore { baseTableMaps.put(TB_PARTTIONS, true); } + // PARTITION_KEYS + if (!baseTableMaps.get(TB_PARTTION_KEYS)) { + String sql = readSchemaFile("partition_params.sql"); + + if (LOG.isDebugEnabled()) { + LOG.debug(sql.toString()); + } + + stmt.executeUpdate(sql.toString()); + LOG.info("Table '" + TB_PARTTION_KEYS + "' is created."); + baseTableMaps.put(TB_PARTTION_KEYS, true); + } + insertSchemaVersion(); } catch (SQLException se) { @@ -270,6 +283,7 @@ public abstract class AbstractMySQLMariaDBStore extends AbstractDBStore { baseTableMaps.put(TB_INDEXES, false); baseTableMaps.put(TB_PARTITION_METHODS, false); baseTableMaps.put(TB_PARTTIONS, false); + baseTableMaps.put(TB_PARTTION_KEYS, false); if (res.wasNull()) return false; http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java index ed6fedc..57ee74f 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java @@ -102,25 +102,17 @@ public interface CatalogStore extends Closeable { /************************** PARTITIONS *****************************/ - void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws CatalogException; - - void addPartition(String databaseName, String tableName, - CatalogProtos.PartitionDescProto partitionDescProto) throws CatalogException; - /** * Get all partitions of a table * @param tableName the table name * @return * @throws CatalogException */ - CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException; + List<CatalogProtos.PartitionDescProto> getPartitions(String databaseName, String tableName) throws CatalogException; - CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException; + CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, + String partitionName) throws CatalogException; - void delPartition(String partitionName) throws CatalogException; - - void dropPartitions(String tableName) throws CatalogException; - List<TablePartitionProto> getAllPartitions() throws CatalogException; /**************************** INDEX *******************************/ http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java index e37efe6..470f09d 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java @@ -29,6 +29,7 @@ import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.catalog.exception.*; +import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; @@ -58,6 +59,7 @@ public class MemStore implements CatalogStore { private final Map<String, CatalogProtos.FunctionDescProto> functions = Maps.newHashMap(); private final Map<String, Map<String, IndexDescProto>> indexes = Maps.newHashMap(); private final Map<String, Map<String, IndexDescProto>> indexesByColumn = Maps.newHashMap(); + private final Map<String, Map<String, CatalogProtos.PartitionDescProto>> partitions = Maps.newHashMap(); public MemStore(Configuration conf) { } @@ -67,6 +69,7 @@ public class MemStore implements CatalogStore { databases.clear(); functions.clear(); indexes.clear(); + partitions.clear(); } @Override @@ -270,6 +273,8 @@ public class MemStore implements CatalogStore { final CatalogProtos.TableDescProto tableDescProto = database.get(tableName); CatalogProtos.TableDescProto newTableDescProto; CatalogProtos.SchemaProto schemaProto; + String partitionName = null; + CatalogProtos.PartitionDescProto partitionDesc = null; switch (alterTableDescProto.getAlterTableType()) { case RENAME_TABLE: @@ -304,11 +309,52 @@ public class MemStore implements CatalogStore { newTableDescProto = tableDescProto.toBuilder().setSchema(newSchemaProto).build(); database.put(tableName, newTableDescProto); break; + case ADD_PARTITION: + partitionDesc = alterTableDescProto.getPartitionDesc(); + partitionName = partitionDesc.getPartitionName(); + + if (partitions.containsKey(tableName) && partitions.get(tableName).containsKey(partitionName)) { + throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName); + } else { + CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); + builder.setPartitionName(partitionName); + builder.setPath(partitionDesc.getPath()); + + if (partitionDesc.getPartitionKeysCount() > 0) { + int i = 0; + for (CatalogProtos.PartitionKeyProto eachKey : partitionDesc.getPartitionKeysList()) { + CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder(); + keyBuilder.setColumnName(eachKey.getColumnName()); + keyBuilder.setPartitionValue(eachKey.getPartitionValue()); + builder.setPartitionKeys(i, keyBuilder.build()); + i++; + } + } + + Map<String, CatalogProtos.PartitionDescProto> protoMap = null; + if (!partitions.containsKey(tableName)) { + protoMap = Maps.newHashMap(); + } else { + protoMap = partitions.get(tableName); + } + protoMap.put(partitionName, builder.build()); + partitions.put(tableName, protoMap); + } + break; + case DROP_PARTITION: + partitionDesc = alterTableDescProto.getPartitionDesc(); + partitionName = partitionDesc.getPartitionName(); + if(!partitions.containsKey(tableName)) { + throw new NoSuchPartitionException(databaseName, tableName, partitionName); + } else { + partitions.remove(partitionName); + } + break; default: - //TODO } } + private int getIndexOfColumnToBeRenamed(List<CatalogProtos.ColumnProto> fieldList, String columnName) { int fieldCount = fieldList.size(); for (int index = 0; index < fieldCount; index++) { @@ -498,39 +544,50 @@ public class MemStore implements CatalogStore { } @Override - public void addPartitions(CatalogProtos.PartitionsProto partitionDescList) throws CatalogException { - throw new RuntimeException("not supported!"); - } + public List<CatalogProtos.PartitionDescProto> getPartitions(String databaseName, String tableName) throws CatalogException { + List<CatalogProtos.PartitionDescProto> protos = new ArrayList<CatalogProtos.PartitionDescProto>(); - @Override - public void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto - partitionDescProto) throws CatalogException { - throw new RuntimeException("not supported!"); + if (partitions.containsKey(tableName)) { + for (CatalogProtos.PartitionDescProto proto : partitions.get(tableName).values()) { + protos.add(proto); + } + } + return protos; } @Override - public CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException { - throw new RuntimeException("not supported!"); + public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, + String partitionName) throws CatalogException { + if (partitions.containsKey(tableName) && partitions.get(tableName).containsKey(partitionName)) { + return partitions.get(tableName).get(partitionName); + } else { + throw new NoSuchPartitionException(partitionName); + } } - @Override - public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException { - throw new RuntimeException("not supported!"); - } + public List<TablePartitionProto> getAllPartitions() throws CatalogException { + List<TablePartitionProto> protos = new ArrayList<TablePartitionProto>(); + Set<String> tables = partitions.keySet(); + for (String table : tables) { + Map<String, CatalogProtos.PartitionDescProto> entryMap = partitions.get(table); + for (Map.Entry<String, CatalogProtos.PartitionDescProto> proto : entryMap.entrySet()) { + CatalogProtos.PartitionDescProto partitionDescProto = proto.getValue(); - @Override - public void delPartition(String partitionName) throws CatalogException { - throw new RuntimeException("not supported!"); - } + TablePartitionProto.Builder builder = TablePartitionProto.newBuilder(); - @Override - public void dropPartitions(String tableName) throws CatalogException { - throw new RuntimeException("not supported!"); - } - - @Override - public List<TablePartitionProto> getAllPartitions() throws CatalogException { - throw new UnsupportedOperationException(); + builder.setPartitionName(partitionDescProto.getPartitionName()); + builder.setPath(partitionDescProto.getPath()); + + // PARTITION_ID and TID is always necessary variables. In other CatalogStore excepting MemStore, + // all partitions would have PARTITION_ID and TID. But MemStore doesn't contain these variable values because + // it is implemented for test purpose. Thus, we need to set each variables to 0. + builder.setPartitionId(0); + builder.setTid(0); + + protos.add(builder.build()); + } + } + return protos; } /* (non-Javadoc)
