TAJO-1493: Make partition pruning based on catalog informations.
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/51bb94d9 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/51bb94d9 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/51bb94d9 Branch: refs/heads/branch-0.11.0 Commit: 51bb94d9f0d6c4598816b5bddad4f1331ccb1e58 Parents: fcc0c03 Author: JaeHwa Jung <[email protected]> Authored: Thu Sep 24 18:12:12 2015 +0900 Committer: JaeHwa Jung <[email protected]> Committed: Thu Sep 24 18:12:12 2015 +0900 ---------------------------------------------------------------------- CHANGES | 10 +- .../tajo/catalog/AbstractCatalogClient.java | 54 +- .../src/main/proto/CatalogProtocol.proto | 4 +- .../apache/tajo/catalog/CatalogConstants.java | 3 +- .../org/apache/tajo/catalog/CatalogService.java | 10 +- .../org/apache/tajo/catalog/CatalogUtil.java | 30 +- .../org/apache/tajo/catalog/DDLBuilder.java | 4 +- .../tajo/catalog/partition/PartitionDesc.java | 30 +- .../src/main/proto/CatalogProtos.proto | 16 +- .../tajo/catalog/store/HiveCatalogStore.java | 222 ++++++- .../catalog/store/TestHiveCatalogStore.java | 140 ++++- tajo-catalog/tajo-catalog-server/pom.xml | 8 + .../org/apache/tajo/catalog/CatalogServer.java | 73 ++- .../tajo/catalog/store/AbstractDBStore.java | 308 +++++++++- .../apache/tajo/catalog/store/CatalogStore.java | 88 ++- .../src/main/resources/schemas/derby/derby.xml | 4 +- .../main/resources/schemas/mariadb/mariadb.xml | 4 +- .../src/main/resources/schemas/mysql/mysql.xml | 4 +- .../main/resources/schemas/oracle/oracle.xml | 4 +- .../resources/schemas/postgresql/postgresql.xml | 4 +- .../apache/tajo/catalog/CatalogTestingUtil.java | 4 +- .../org/apache/tajo/catalog/TestCatalog.java | 109 +++- .../TestCatalogAgainstCaseSensitivity.java | 2 +- .../org/apache/tajo/cli/tools/TajoDump.java | 8 +- .../apache/tajo/client/CatalogAdminClient.java | 2 +- .../tajo/client/CatalogAdminClientImpl.java | 2 +- .../org/apache/tajo/client/TajoClientImpl.java | 5 +- .../apache/tajo/exception/ErrorMessages.java | 1 + .../apache/tajo/exception/ExceptionUtil.java | 1 + .../apache/tajo/exception/ReturnStateUtil.java | 4 + .../UndefinedPartitionMethodException.java | 4 +- tajo-common/src/main/proto/errors.proto | 1 + .../planner/TestEvalNodeToExprConverter.java | 406 +++++++++++++ .../tajo/engine/query/TestAlterTable.java | 6 +- .../tajo/engine/query/TestTablePartitions.java | 526 ++++++++++++++++- .../planner/physical/ColPartitionStoreExec.java | 12 +- .../tajo/master/TajoMasterClientService.java | 2 +- .../apache/tajo/master/exec/DDLExecutor.java | 24 +- .../java/org/apache/tajo/querymaster/Query.java | 23 +- .../apache/tajo/plan/expr/AlgebraicUtil.java | 148 ++++- .../rewrite/rules/PartitionedTableRewriter.java | 171 ++++-- .../tajo/plan/util/EvalNodeToExprConverter.java | 297 ++++++++++ .../util/PartitionFilterAlgebraVisitor.java | 573 +++++++++++++++++++ .../org/apache/tajo/storage/FileTablespace.java | 4 +- 44 files changed, 3210 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 8bc7409..26882d7 100644 --- a/CHANGES +++ b/CHANGES @@ -572,9 +572,7 @@ Release 0.11.0 - unreleased TASKS - TAJO-1673: Implement recover partitions. (jaehwa) - - TAJO-1872: Increase the minimum split size and add a classpath to hadoop + TAJO-1872: Increase the minimum split size and add a classpath to hadoop tools. (jihoon) TAJO-1870: Enable tests of tajo-storage-pgsql module when arch type @@ -673,7 +671,11 @@ Release 0.11.0 - unreleased SUB TASKS - TAJO-1465: Add ORCFileAppender to write into ORCFile table. + TAJO-1493: Make partition pruning based on catalog informations. (jaehwa) + + TAJO-1673: Implement recover partitions. (jaehwa) + + TAJO-1465: Add ORCFileAppender to write into ORCFile table. (Contributed by Joyngyoung Park, committed by hyunsik) TAJO-1841: Eliminate explicit diamond expressions in tajo-{algebra,rpc} http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index 1dc7a71..912d24d 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -407,6 +407,29 @@ public abstract class AbstractCatalogClient implements CatalogService, Closeable } @Override + public boolean existPartitions(String databaseName, String tableName) throws UndefinedDatabaseException, + UndefinedTableException, UndefinedPartitionMethodException { + + try { + final BlockingInterface stub = getStub(); + final TableIdentifierProto request = buildTableIdentifier(databaseName, tableName); + final ReturnState state = stub.existsPartitions(null, request); + + if (isThisError(state, UNDEFINED_PARTITIONS)) { + return false; + } + throwsIfThisError(state, UndefinedDatabaseException.class); + throwsIfThisError(state, UndefinedTableException.class); + throwsIfThisError(state, UndefinedPartitionMethodException.class); + ensureOk(state); + return true; + + } catch (ServiceException e) { + throw new RuntimeException(e); + } + } + + @Override public final PartitionDescProto getPartition(final String databaseName, final String tableName, final String partitionName) throws UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionException, @@ -434,15 +457,16 @@ public abstract class AbstractCatalogClient implements CatalogService, Closeable } @Override - public final List<PartitionDescProto> getPartitions(final String databaseName, final String tableName) { + public final List<PartitionDescProto> getPartitionsOfTable(final String databaseName, final String tableName) throws + UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException { try { final BlockingInterface stub = getStub(); - final PartitionIdentifierProto request = PartitionIdentifierProto.newBuilder() - .setDatabaseName(databaseName) - .setTableName(tableName) - .build(); + final TableIdentifierProto request = buildTableIdentifier(databaseName, tableName); final GetPartitionsResponse response = stub.getPartitionsByTableName(null, request); + throwsIfThisError(response.getState(), UndefinedDatabaseException.class); + throwsIfThisError(response.getState(), UndefinedTableException.class); + throwsIfThisError(response.getState(), UndefinedPartitionMethodException.class); ensureOk(response.getState()); return response.getPartitionList(); @@ -452,6 +476,26 @@ public abstract class AbstractCatalogClient implements CatalogService, Closeable } @Override + public List<PartitionDescProto> getPartitionsByAlgebra(PartitionsByAlgebraProto request) throws + UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + UnsupportedException { + try { + final BlockingInterface stub = getStub(); + GetPartitionsResponse response = stub.getPartitionsByAlgebra(null, request); + + throwsIfThisError(response.getState(), UndefinedDatabaseException.class); + throwsIfThisError(response.getState(), UndefinedTableException.class); + throwsIfThisError(response.getState(), UndefinedPartitionMethodException.class); + throwsIfThisError(response.getState(), UnsupportedException.class); + ensureOk(response.getState()); + return response.getPartitionList(); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); + return null; + } + } + + @Override public List<TablePartitionProto> getAllPartitions() { try { final BlockingInterface stub = getStub(); http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto index 8cc8e2f..a3a904b 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto +++ b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto @@ -120,10 +120,12 @@ service CatalogProtocolService { rpc getPartitionMethodByTableName(TableIdentifierProto) returns (GetPartitionMethodResponse); rpc existPartitionMethod(TableIdentifierProto) returns (ReturnState); + rpc existsPartitions(TableIdentifierProto) returns (ReturnState); rpc getPartitionByPartitionName(PartitionIdentifierProto) returns (GetPartitionDescResponse); - rpc getPartitionsByTableName(PartitionIdentifierProto) returns (GetPartitionsResponse); + rpc getPartitionsByTableName(TableIdentifierProto) returns (GetPartitionsResponse); rpc getAllPartitions(NullProto) returns (GetTablePartitionsResponse); rpc addPartitions(AddPartitionsProto) returns (ReturnState); + rpc getPartitionsByAlgebra(PartitionsByAlgebraProto) returns (GetPartitionsResponse); rpc createIndex(IndexDescProto) returns (ReturnState); rpc dropIndex(IndexNameProto) returns (ReturnState); http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java index 721bcf1..f2acf98 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java @@ -59,6 +59,7 @@ public class CatalogConstants { public static final String COL_PARTITIONS_PK = "PARTITION_ID"; public static final String COL_COLUMN_NAME = "COLUMN_NAME"; public static final String COL_PARTITION_VALUE = "PARTITION_VALUE"; - + public static final String COL_PARTITION_BYTES = "NUM_BYTES"; + public static final String INFORMATION_SCHEMA_DB_NAME = "information_schema"; } http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java index b031313..2140b49 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java @@ -168,11 +168,19 @@ public interface CatalogService { boolean existPartitionMethod(String databaseName, String tableName) throws UndefinedTableException, UndefinedDatabaseException; + boolean existPartitions(String databaseName, String tableName) throws + UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException; + PartitionDescProto getPartition(String databaseName, String tableName, String partitionName) throws UndefinedPartitionException, UndefinedPartitionMethodException, UndefinedDatabaseException, UndefinedTableException; - List<PartitionDescProto> getPartitions(String databaseName, String tableName); + List<PartitionDescProto> getPartitionsOfTable(String databaseName, String tableName) throws UndefinedDatabaseException, + UndefinedTableException, UndefinedPartitionMethodException; + + List<PartitionDescProto> getPartitionsByAlgebra(PartitionsByAlgebraProto request) throws + UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + UnsupportedException; List<TablePartitionProto> getAllPartitions(); http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index 4e52de1..665f9ec 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -812,16 +812,33 @@ public class CatalogUtil { /** * Converts passed parameters to a AlterTableDesc. This method would be called when adding a partition or dropping * a table. This creates AlterTableDesc that is a wrapper class for protocol buffer. + * * + * @param tableName + * @param columns + * @param values + * @param path + * @param alterTableType + * @return + */ + public static AlterTableDesc addOrDropPartition(String tableName, String[] columns, String[] values, @Nullable + String path, AlterTableType alterTableType) { + return addOrDropPartition(tableName, columns, values, path, alterTableType, 0L); + } + /** + * Converts passed parameters to a AlterTableDesc. This method would be called when adding a partition or dropping + * a table. This creates AlterTableDesc that is a wrapper class for protocol buffer. * * @param tableName table name * @param columns partition column names * @param values partition values - * @param location partition location + * @param path partition directory path * @param alterTableType ADD_PARTITION or DROP_PARTITION + * @param numBytes contents length * @return AlterTableDesc */ - public static AlterTableDesc addOrDropPartition(String tableName, String[] columns, - String[] values, String location, AlterTableType alterTableType) { + public static AlterTableDesc addOrDropPartition(String tableName, String[] columns, String[] values, + @Nullable String path, AlterTableType alterTableType, long numBytes) { + final AlterTableDesc alterTableDesc = new AlterTableDesc(); alterTableDesc.setTableName(tableName); @@ -831,8 +848,11 @@ public class CatalogUtil { partitionDesc.setPartitionKeys(pair.getFirst()); partitionDesc.setPartitionName(pair.getSecond()); - if (alterTableType.equals(AlterTableType.ADD_PARTITION) && location != null) { - partitionDesc.setPath(location); + if (alterTableType.equals(AlterTableType.ADD_PARTITION)) { + if (path != null) { + partitionDesc.setPath(path); + } + partitionDesc.setNumBytes(numBytes); } alterTableDesc.setPartitionDesc(partitionDesc); http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java index 920fe83..7aecc01 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java @@ -22,6 +22,7 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.util.KeyValueSet; +import java.io.File; import java.util.List; import java.util.Map; @@ -166,8 +167,7 @@ public class DDLBuilder { List<Column> colums = table.getPartitionMethod().getExpressionSchema().getAllColumns(); - String[] splitPartitionName = partition.getPartitionName().split("/"); - + String[] splitPartitionName = partition.getPartitionName().split(File.separator); for(int i = 0; i < splitPartitionName.length; i++) { String[] partitionColumnValue = splitPartitionName[i].split("="); if (i > 0) { http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java index 7287fce..e41ac85 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java @@ -27,6 +27,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.json.GsonObject; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionKeyProto; +import org.apache.tajo.util.TUtil; import java.util.List; @@ -56,6 +57,7 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro @Expose protected String partitionName; @Expose protected List<PartitionKeyProto> partitionKeys; @Expose protected String path; //optional + @Expose private Long numBytes = null; // optional private CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); @@ -83,27 +85,30 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro this.partitionKeys = partitionKeys; } + public Long getNumBytes() { + return numBytes; + } + + public void setNumBytes(Long numBytes) { + this.numBytes = numBytes; + } + public int hashCode() { - return Objects.hashCode(partitionName, partitionKeys, path); + return Objects.hashCode(partitionName, partitionKeys, path, numBytes); } public boolean equals(Object o) { if (o instanceof PartitionDesc) { PartitionDesc another = (PartitionDesc) o; - boolean eq = ((partitionName != null && another.partitionName != null - && partitionName.equals(another.partitionName)) || - (partitionName == null && another.partitionName == null)); - eq = eq && ((partitionKeys != null && another.partitionKeys != null - && partitionKeys.equals(another.partitionKeys)) - || (partitionKeys == null && another.partitionKeys == null)); - eq = eq && ((path != null && another.path != null && path.equals(another.path)) || - (path == null && another.path == null)); + boolean eq = this.partitionName.equals(another.partitionName); + eq = eq && this.partitionKeys.equals(another.partitionKeys); + eq = eq && this.path.equals(another.path); + eq = eq && TUtil.checkEquals(this.numBytes, another.numBytes); return eq; } return false; } - @Override public CatalogProtos.PartitionDescProto getProto() { if (builder == null) { @@ -125,6 +130,10 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro builder.setPath(this.path); } + if(this.numBytes != null) { + builder.setNumBytes(this.numBytes); + } + return builder.build(); } @@ -149,6 +158,7 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro desc.partitionName = partitionName; desc.partitionKeys = partitionKeys; desc.path = path; + desc.numBytes = numBytes; return desc; } http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index 0b7a1b4..08818b1 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -248,12 +248,12 @@ message PartitionDescProto { repeated PartitionKeyProto partitionKeys = 2; optional string path = 3; optional int32 id = 4; + optional int64 numBytes = 5; } message PartitionKeyProto { required string columnName = 1; - optional string parentColumnName = 2; - required string partitionValue = 3; + required string partitionValue = 2; } message PartitionIdentifierProto { @@ -262,6 +262,18 @@ message PartitionIdentifierProto { optional string partitionName = 3; } +message PartitionsByAlgebraProto { + required string databaseName = 1; + required string tableName = 2; + required string algebra = 3; // json object which contains algebra expressions +} + +message PartitionsByFilterProto { + required string databaseName = 1; + required string tableName = 2; + required string filter = 3; // filter string: (col1 ='1' or col1 = '100') and col3 > 20 +} + message TablespaceProto { required string spaceName = 1; required string uri = 2; http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java index e2229ba..87cc93f 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.serde.serdeConstants; @@ -34,6 +35,9 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.tajo.BuiltinStorages; import org.apache.tajo.TajoConstants; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.algebra.IsNullPredicate; +import org.apache.tajo.algebra.JsonHelper; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -42,11 +46,14 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.*; +import org.apache.tajo.plan.expr.AlgebraicUtil; +import org.apache.tajo.plan.util.PartitionFilterAlgebraVisitor; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; import org.apache.thrift.TException; +import java.io.File; import java.io.IOException; import java.util.*; @@ -693,7 +700,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { Table table = client.getHiveClient().getTable(databaseName, tableName); List<FieldSchema> columns = table.getSd().getCols(); columns.add(new FieldSchema(columnProto.getName(), - HiveCatalogUtil.getHiveFieldType(columnProto.getDataType()), "")); + HiveCatalogUtil.getHiveFieldType(columnProto.getDataType()), "")); client.getHiveClient().alter_table(databaseName, tableName, table); @@ -718,6 +725,10 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { partition.setDbName(databaseName); partition.setTableName(tableName); + Map<String, String> params = TUtil.newHashMap(); + params.put(StatsSetupConst.TOTAL_SIZE, Long.toString(partitionDescProto.getNumBytes())); + partition.setParameters(params); + List<String> values = Lists.newArrayList(); for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) { values.add(keyProto.getPartitionValue()); @@ -845,11 +856,204 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { } @Override - public List<CatalogProtos.PartitionDescProto> getPartitions(String databaseName, - String tableName) { - throw new UnsupportedOperationException(); + public boolean existPartitions(String databaseName, String tableName) throws UndefinedDatabaseException, + UndefinedTableException, UndefinedPartitionMethodException { + + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + boolean result = false; + + try { + client = clientPool.getClient(); + List<Partition> partitions = client.getHiveClient().listPartitionsByFilter(databaseName, tableName, + "", (short) -1); + + if (partitions.size() > 0) { + result = true; + } + } catch (Exception e) { + throw new TajoInternalError(e); + } finally { + if (client != null) { + client.release(); + } + } + + return result; + } + + @Override + public List<CatalogProtos.PartitionDescProto> getPartitionsOfTable(String databaseName, String tableName) + throws UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException { + List<PartitionDescProto> list = null; + + try { + if (!existDatabase(databaseName)) { + throw new UndefinedDatabaseException(tableName); + } + + if (!existTable(databaseName, tableName)) { + throw new UndefinedTableException(tableName); + } + + if (!existPartitionMethod(databaseName, tableName)) { + throw new UndefinedPartitionMethodException(tableName); + } + + list = getPartitionsFromHiveMetaStore(databaseName, tableName, ""); + } catch (Exception se) { + throw new TajoInternalError(se); + } + + return list; + } + + @Override + public List<PartitionDescProto> getPartitionsByAlgebra(PartitionsByAlgebraProto request) throws + UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UnsupportedException { + + List<PartitionDescProto> list = null; + + try { + String databaseName = request.getDatabaseName(); + String tableName = request.getTableName(); + + if (!existDatabase(databaseName)) { + throw new UndefinedDatabaseException(tableName); + } + + if (!existTable(databaseName, tableName)) { + throw new UndefinedTableException(tableName); + } + + if (!existPartitionMethod(databaseName, tableName)) { + throw new UndefinedPartitionMethodException(tableName); + } + + TableDescProto tableDesc = getTable(databaseName, tableName); + String filter = getFilter(databaseName, tableName, tableDesc.getPartition().getExpressionSchema().getFieldsList() + , request.getAlgebra()); + list = getPartitionsFromHiveMetaStore(databaseName, tableName, filter); + } catch (UnsupportedException ue) { + throw ue; + } catch (Exception se) { + throw new TajoInternalError(se); + } + + return list; + } + + private String getFilter(String databaseName, String tableName, List<ColumnProto> partitionColumns + , String json) throws TajoException { + + Expr[] exprs = null; + + if (json != null && !json.isEmpty()) { + Expr algebra = JsonHelper.fromJson(json, Expr.class); + exprs = AlgebraicUtil.toConjunctiveNormalFormArray(algebra); + } + + PartitionFilterAlgebraVisitor visitor = new PartitionFilterAlgebraVisitor(); + visitor.setIsHiveCatalog(true); + + Expr[] filters = AlgebraicUtil.getRearrangedCNFExpressions(databaseName + "." + tableName, partitionColumns, exprs); + + StringBuffer sb = new StringBuffer(); + + // Write join clause from second column to last column. + Column target; + + String result; + for (int i = 0; i < partitionColumns.size(); i++) { + target = new Column(partitionColumns.get(i)); + + if (!(filters[i] instanceof IsNullPredicate)) { + visitor.setColumn(target); + visitor.visit(null, new Stack<Expr>(), filters[i]); + result = visitor.getResult(); + + // If visitor build filter successfully, add filter to be used for executing hive api. + if (result.length() > 0) { + if (sb.length() > 0) { + sb.append(" AND "); + } + sb.append(" ( ").append(result).append(" ) "); + } else { + throw new TajoInternalError("Filter does not exist : " + filters[i].toJson()); + } + } + } + + return sb.toString(); } + /** + * Get list of partitions matching specified filter. + * + * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3). + * Assume that an user want to give a condition WHERE (col1 ='1' or col1 = '100') and col3 > 20 . + * + * Then, the filter string would be written as following: + * (col1 =\"1\" or col1 = \"100\") and col3 > 20 + * + * + * @param databaseName + * @param tableName + * @param filter + * @return + */ + private List<PartitionDescProto> getPartitionsFromHiveMetaStore(String databaseName, String tableName, + String filter) { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + List<PartitionDescProto> partitions = null; + TableDescProto tableDesc = null; + List<ColumnProto> parititonColumns = null; + + try { + partitions = TUtil.newList(); + client = clientPool.getClient(); + + List<Partition> hivePartitions = client.getHiveClient().listPartitionsByFilter(databaseName, tableName + , filter, (short) -1); + + tableDesc = getTable(databaseName, tableName); + parititonColumns = tableDesc.getPartition().getExpressionSchema().getFieldsList(); + + StringBuilder partitionName = new StringBuilder(); + for (Partition hivePartition : hivePartitions) { + CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); + builder.setPath(hivePartition.getSd().getLocation()); + + partitionName.delete(0, partitionName.length()); + for (int i = 0; i < parititonColumns.size(); i++) { + if (i > 0) { + partitionName.append(File.separator); + } + partitionName.append(CatalogUtil.extractSimpleName(parititonColumns.get(i).getName())); + partitionName.append("="); + partitionName.append(hivePartition.getValues().get(i)); + } + + builder.setPartitionName(partitionName.toString()); + + Map<String, String> params = hivePartition.getParameters(); + if (params != null) { + if (params.get(StatsSetupConst.TOTAL_SIZE) != null) { + builder.setNumBytes(Long.parseLong(params.get(StatsSetupConst.TOTAL_SIZE))); + } + } + + partitions.add(builder.build()); + } + } catch (Exception e) { + throw new TajoInternalError(e); + } finally { + if (client != null) { + client.release(); + } + } + + return partitions; + } @Override public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, @@ -875,6 +1079,14 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { keyBuilder.setColumnName(columnName); keyBuilder.setPartitionValue(value); builder.addPartitionKeys(keyBuilder); + + Map<String, String> params = partition.getParameters(); + if (params != null) { + if (params.get(StatsSetupConst.TOTAL_SIZE) != null) { + builder.setNumBytes(Long.parseLong(params.get(StatsSetupConst.TOTAL_SIZE))); + } + } + } } catch (NoSuchObjectException e) { return null; @@ -1031,7 +1243,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { // Unfortunately, hive client add_partitions doesn't run as expected. The method never read the ifNotExists // parameter. So, if Tajo adds existing partition to Hive, it will threw AlreadyExistsException. To avoid // above error, we need to filter existing partitions before call add_partitions. - if (existingPartition != null) { + if (existingPartition == null) { Partition partition = new Partition(); partition.setDbName(databaseName); partition.setTableName(tableName); http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java index b3af179..8d4b10b 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java @@ -34,6 +34,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -261,10 +262,27 @@ public class TestHiveCatalogStore { } testAddPartition(table1.getUri(), NATION, "n_nationkey=10/n_date=20150101"); + testAddPartition(table1.getUri(), NATION, "n_nationkey=10/n_date=20150102"); + testAddPartition(table1.getUri(), NATION, "n_nationkey=20/n_date=20150101"); testAddPartition(table1.getUri(), NATION, "n_nationkey=20/n_date=20150102"); + testAddPartition(table1.getUri(), NATION, "n_nationkey=30/n_date=20150101"); + testAddPartition(table1.getUri(), NATION, "n_nationkey=30/n_date=20150102"); + + List<String> partitionNames = TUtil.newList(); + partitionNames.add("n_nationkey=40/n_date=20150801"); + partitionNames.add("n_nationkey=40/n_date=20150802"); + partitionNames.add("n_nationkey=50/n_date=20150801"); + partitionNames.add("n_nationkey=50/n_date=20150802"); + testAddPartitions(table1.getUri(), NATION, partitionNames); + + testGetPartitionsByAlgebra(DB_NAME, NATION); testDropPartition(NATION, "n_nationkey=10/n_date=20150101"); + testDropPartition(NATION, "n_nationkey=10/n_date=20150102"); + testDropPartition(NATION, "n_nationkey=20/n_date=20150101"); testDropPartition(NATION, "n_nationkey=20/n_date=20150102"); + testDropPartition(NATION, "n_nationkey=30/n_date=20150101"); + testDropPartition(NATION, "n_nationkey=30/n_date=20150102"); CatalogProtos.PartitionDescProto partition = store.getPartition(DB_NAME, NATION, "n_nationkey=10/n_date=20150101"); assertNull(partition); @@ -275,6 +293,87 @@ public class TestHiveCatalogStore { store.dropTable(DB_NAME, NATION); } + private void testGetPartitionsByAlgebra(String databaseName, String tableName) throws Exception { + String qfTableName = databaseName + "." + tableName; + + // Equals Operator + CatalogProtos.PartitionsByAlgebraProto.Builder request = CatalogProtos.PartitionsByAlgebraProto.newBuilder(); + request.setDatabaseName(databaseName); + request.setTableName(tableName); + + String algebra = "{\n" + + " \"LeftExpr\": {\n" + + " \"LeftExpr\": {\n" + + " \"Qualifier\": \"" + qfTableName + "\",\n" + + " \"ColumnName\": \"n_nationkey\",\n" + + " \"OpType\": \"Column\"\n" + + " },\n" + + " \"RightExpr\": {\n" + + " \"Value\": \"10\",\n" + + " \"ValueType\": \"Unsigned_Integer\",\n" + + " \"OpType\": \"Literal\"\n" + + " },\n" + + " \"OpType\": \"Equals\"\n" + + " },\n" + + " \"RightExpr\": {\n" + + " \"LeftExpr\": {\n" + + " \"Qualifier\": \"" + qfTableName + "\",\n" + + " \"ColumnName\": \"n_date\",\n" + + " \"OpType\": \"Column\"\n" + + " },\n" + + " \"RightExpr\": {\n" + + " \"Value\": \"20150101\",\n" + + " \"ValueType\": \"String\",\n" + + " \"OpType\": \"Literal\"\n" + + " },\n" + + " \"OpType\": \"Equals\"\n" + + " },\n" + + " \"OpType\": \"And\"\n" + + "}"; + + request.setAlgebra(algebra); + + List<CatalogProtos.PartitionDescProto> partitions = store.getPartitionsByAlgebra(request.build()); + assertNotNull(partitions); + assertEquals(1, partitions.size()); + + // OR + algebra = "{\n" + + " \"LeftExpr\": {\n" + + " \"LeftExpr\": {\n" + + " \"Qualifier\": \"" + qfTableName + "\",\n" + + " \"ColumnName\": \"n_nationkey\",\n" + + " \"OpType\": \"Column\"\n" + + " },\n" + + " \"RightExpr\": {\n" + + " \"Value\": \"20\",\n" + + " \"ValueType\": \"Unsigned_Integer\",\n" + + " \"OpType\": \"Literal\"\n" + + " },\n" + + " \"OpType\": \"Equals\"\n" + + " },\n" + + " \"RightExpr\": {\n" + + " \"LeftExpr\": {\n" + + " \"Qualifier\": \"" + qfTableName + "\",\n" + + " \"ColumnName\": \"n_nationkey\",\n" + + " \"OpType\": \"Column\"\n" + + " },\n" + + " \"RightExpr\": {\n" + + " \"Value\": \"30\",\n" + + " \"ValueType\": \"Unsigned_Integer\",\n" + + " \"OpType\": \"Literal\"\n" + + " },\n" + + " \"OpType\": \"Equals\"\n" + + " },\n" + + " \"OpType\": \"Or\"\n" + + "}"; + + request.setAlgebra(algebra); + + partitions = store.getPartitionsByAlgebra(request.build()); + assertNotNull(partitions); + assertEquals(4, partitions.size()); + } private void testAddPartition(URI uri, String tableName, String partitionName) throws Exception { AlterTableDesc alterTableDesc = new AlterTableDesc(); @@ -316,6 +415,45 @@ public class TestHiveCatalogStore { } } + private void testAddPartitions(URI uri, String tableName, List<String> partitionNames) throws Exception { + List<CatalogProtos.PartitionDescProto> partitions = TUtil.newList(); + for (String partitionName : partitionNames) { + CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); + builder.setPartitionName(partitionName); + Path path = new Path(uri.getPath(), partitionName); + builder.setPath(path.toString()); + + List<PartitionKeyProto> partitionKeyList = new ArrayList<PartitionKeyProto>(); + String[] split = partitionName.split("/"); + for(int i = 0; i < split.length; i++) { + String[] eachPartitionName = split[i].split("="); + + PartitionKeyProto.Builder keyBuilder = PartitionKeyProto.newBuilder(); + keyBuilder.setColumnName(eachPartitionName[0]); + keyBuilder.setPartitionValue(eachPartitionName[1]); + partitionKeyList.add(keyBuilder.build()); + } + builder.addAllPartitionKeys(partitionKeyList); + partitions.add(builder.build()); + } + + store.addPartitions(DB_NAME, tableName, partitions, true); + + for (String partitionName : partitionNames) { + CatalogProtos.PartitionDescProto resultDesc = store.getPartition(DB_NAME, NATION, partitionName); + assertNotNull(resultDesc); + assertEquals(resultDesc.getPartitionName(), partitionName); + assertEquals(resultDesc.getPath(), uri.toString() + "/" + partitionName); + assertEquals(resultDesc.getPartitionKeysCount(), 2); + + String[] split = partitionName.split("/"); + for (int i = 0; i < resultDesc.getPartitionKeysCount(); i++) { + CatalogProtos.PartitionKeyProto keyProto = resultDesc.getPartitionKeys(i); + String[] eachName = split[i].split("="); + assertEquals(keyProto.getPartitionValue(), eachName[1]); + } + } + } private void testDropPartition(String tableName, String partitionName) throws Exception { AlterTableDesc alterTableDesc = new AlterTableDesc(); @@ -405,7 +543,7 @@ public class TestHiveCatalogStore { } assertEquals(StorageConstants.DEFAULT_BINARY_SERDE, - table1.getMeta().getOption(StorageConstants.SEQUENCEFILE_SERDE)); + table1.getMeta().getOption(StorageConstants.SEQUENCEFILE_SERDE)); store.dropTable(DB_NAME, REGION); } http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-server/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/pom.xml b/tajo-catalog/tajo-catalog-server/pom.xml index 0b5afbb..922e9b1 100644 --- a/tajo-catalog/tajo-catalog-server/pom.xml +++ b/tajo-catalog/tajo-catalog-server/pom.xml @@ -148,6 +148,14 @@ <artifactId>tajo-rpc-protobuf</artifactId> </dependency> <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-algebra</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-plan</artifactId> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java index a60ddda..656ef67 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java @@ -35,6 +35,7 @@ import org.apache.tajo.TajoConstants; import org.apache.tajo.annotation.ThreadSafe; import org.apache.tajo.catalog.CatalogProtocol.*; import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.exception.*; import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.catalog.store.CatalogStore; @@ -44,6 +45,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.rpc.BlockingRpcServer; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListResponse; @@ -923,6 +925,38 @@ public class CatalogServer extends AbstractService { } @Override + public ReturnState existsPartitions(RpcController controller, TableIdentifierProto request) throws + ServiceException { + + String dbName = request.getDatabaseName(); + String tbName = request.getTableName(); + + // linked meta data do not support partition. + // So, the request that wants to get partitions in this db will be failed. + if (linkedMetadataManager.existsDatabase(dbName)) { + return errUndefinedPartitionMethod(tbName); + } + + if (metaDictionary.isSystemDatabase(dbName)) { + return errUndefinedPartitionMethod(tbName); + } else { + rlock.lock(); + try { + if (store.existPartitions(dbName, tbName)) { + return OK; + } else { + return errUndefinedPartitions(tbName); + } + } catch (Throwable t) { + printStackTraceIfError(LOG, t); + return returnError(t); + } finally { + rlock.unlock(); + } + } + } + + @Override public GetPartitionDescResponse getPartitionByPartitionName(RpcController controller, PartitionIdentifierProto request) throws ServiceException { @@ -973,7 +1007,7 @@ public class CatalogServer extends AbstractService { } @Override - public GetPartitionsResponse getPartitionsByTableName(RpcController controller, PartitionIdentifierProto request) + public GetPartitionsResponse getPartitionsByTableName(RpcController controller, TableIdentifierProto request) throws ServiceException { String dbName = request.getDatabaseName(); String tbName = request.getTableName(); @@ -996,7 +1030,7 @@ public class CatalogServer extends AbstractService { rlock.lock(); try { - List<PartitionDescProto> partitions = store.getPartitions(dbName, tbName); + List<PartitionDescProto> partitions = store.getPartitionsOfTable(dbName, tbName); GetPartitionsResponse.Builder builder = GetPartitionsResponse.newBuilder(); for (PartitionDescProto partition : partitions) { @@ -1043,6 +1077,41 @@ public class CatalogServer extends AbstractService { } @Override + public GetPartitionsResponse getPartitionsByAlgebra(RpcController controller, + PartitionsByAlgebraProto request) throws ServiceException { + String dbName = request.getDatabaseName(); + String tbName = request.getTableName(); + + // linked meta data do not support partition. + // So, the request that wants to get partitions in this db will be failed. + if (linkedMetadataManager.existsDatabase(dbName)) { + return GetPartitionsResponse.newBuilder().setState(errUndefinedPartitionMethod(tbName)).build(); + } + + if (metaDictionary.isSystemDatabase(dbName)) { + return GetPartitionsResponse.newBuilder().setState(errUndefinedPartitionMethod(tbName)).build(); + } else { + rlock.lock(); + try { + GetPartitionsResponse.Builder builder = GetPartitionsResponse.newBuilder(); + List<PartitionDescProto> partitions = store.getPartitionsByAlgebra(request); + builder.addAllPartition(partitions); + builder.setState(OK); + return builder.build(); + } catch (Throwable t) { + printStackTraceIfError(LOG, t); + + return GetPartitionsResponse.newBuilder() + .setState(returnError(t)) + .build(); + + } finally { + rlock.unlock(); + } + } + } + + @Override public ReturnState addPartitions(RpcController controller, AddPartitionsProto request) { TableIdentifierProto identifier = request.getTableIdentifier(); http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java index d46ab3c..19bb678 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java @@ -25,25 +25,28 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.algebra.JsonHelper; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.common.TajoDataTypes.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.*; import org.apache.tajo.util.JavaResourceUtil; +import org.apache.tajo.plan.expr.*; +import org.apache.tajo.plan.util.PartitionFilterAlgebraVisitor; +import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; import java.io.IOException; import java.net.URI; import java.sql.*; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.sql.Date; +import java.util.*; import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand; import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto; @@ -57,7 +60,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo protected final String catalogUri; protected final String insertPartitionSql = "INSERT INTO " + TB_PARTTIONS - + "(" + COL_TABLES_PK + ", PARTITION_NAME, PATH) VALUES (?, ? , ?)"; + + "(" + COL_TABLES_PK + ", PARTITION_NAME, PATH, " + COL_PARTITION_BYTES + + ") VALUES (?, ? , ?, ?)"; protected final String insertPartitionKeysSql = "INSERT INTO " + TB_PARTTION_KEYS + "(" + COL_PARTITIONS_PK + ", " + COL_TABLES_PK + ", " @@ -1380,6 +1384,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo pstmt1.setInt(1, tableId); pstmt1.setString(2, partition.getPartitionName()); pstmt1.setString(3, partition.getPath()); + pstmt1.setLong(4, partition.getNumBytes()); pstmt1.executeUpdate(); pstmt2 = conn.prepareStatement(insertPartitionKeysSql); @@ -2133,7 +2138,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo PartitionDescProto.Builder builder = null; try { - String sql = "SELECT PATH, " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS + + String sql = "SELECT PATH, " + COL_PARTITIONS_PK + ", " + COL_PARTITION_BYTES + " FROM " + TB_PARTTIONS + " WHERE " + COL_TABLES_PK + " = ? AND PARTITION_NAME = ? "; if (LOG.isDebugEnabled()) { @@ -2151,6 +2156,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo builder.setId(res.getInt(COL_PARTITIONS_PK)); builder.setPath(res.getString("PATH")); builder.setPartitionName(partitionName); + builder.setNumBytes(res.getLong(COL_PARTITION_BYTES)); setPartitionKeys(res.getInt(COL_PARTITIONS_PK), builder); } else { throw new UndefinedPartitionException(partitionName); @@ -2191,9 +2197,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo } @Override - public List<PartitionDescProto> getPartitions(String databaseName, String tableName) + public List<PartitionDescProto> getPartitionsOfTable(String databaseName, String tableName) throws UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException { - Connection conn = null; ResultSet res = null; PreparedStatement pstmt = null; @@ -2205,8 +2210,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo ensurePartitionTable(tableName, tableId); try { - String sql = "SELECT PATH, PARTITION_NAME, " + COL_PARTITIONS_PK + " FROM " - + TB_PARTTIONS +" WHERE " + COL_TABLES_PK + " = ? "; + String sql = "SELECT PATH, PARTITION_NAME, " + COL_PARTITIONS_PK + ", " + COL_PARTITION_BYTES + + " FROM " + TB_PARTTIONS +" WHERE " + COL_TABLES_PK + " = ? "; if (LOG.isDebugEnabled()) { LOG.debug(sql); @@ -2221,6 +2226,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo builder = PartitionDescProto.newBuilder(); builder.setPath(res.getString("PATH")); builder.setPartitionName(res.getString("PARTITION_NAME")); + builder.setNumBytes(res.getLong(COL_PARTITION_BYTES)); setPartitionKeys(res.getInt(COL_PARTITIONS_PK), builder); partitions.add(builder.build()); } @@ -2233,6 +2239,256 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo } @Override + public boolean existPartitions(String databaseName, String tableName) throws UndefinedDatabaseException, + UndefinedTableException, UndefinedPartitionMethodException { + + String sql = null; + Connection conn = null; + ResultSet res = null; + PreparedStatement pstmt = null; + boolean result = false; + + final int databaseId = getDatabaseId(databaseName); + final int tableId = getTableId(databaseId, databaseName, tableName); + ensurePartitionTable(tableName, tableId); + + try { + if (this instanceof DerbyStore) { + sql = "SELECT 1 FROM " + TB_PARTTIONS +" WHERE " + COL_TABLES_PK + " = ? FETCH FIRST ROW ONLY "; + } else { + sql = "SELECT 1 FROM " + TB_PARTTIONS +" WHERE " + COL_TABLES_PK + " = ? LIMIT 1 "; + } + + if (LOG.isDebugEnabled()) { + LOG.debug(sql); + } + + conn = getConnection(); + pstmt = conn.prepareStatement(sql); + pstmt.setInt(1, tableId); + res = pstmt.executeQuery(); + + if (res.next()) { + result = true; + } + } catch (SQLException se) { + throw new TajoInternalError(se); + } finally { + CatalogUtil.closeQuietly(pstmt, res); + } + return result; + } + + @Override + public List<PartitionDescProto> getPartitionsByAlgebra(PartitionsByAlgebraProto request) throws + UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + UnsupportedException { + Connection conn = null; + PreparedStatement pstmt = null; + ResultSet res = null; + int currentIndex = 1; + String selectStatement = null; + Pair<String, List<PartitionFilterSet>> pair = null; + + List<PartitionDescProto> partitions = TUtil.newList(); + List<PartitionFilterSet> filterSets = null; + + int databaseId = getDatabaseId(request.getDatabaseName()); + int tableId = getTableId(databaseId, request.getDatabaseName(), request.getTableName()); + if (!existPartitionMethod(request.getDatabaseName(), request.getTableName())) { + throw new UndefinedPartitionMethodException(request.getTableName()); + } + + try { + TableDescProto tableDesc = getTable(request.getDatabaseName(), request.getTableName()); + + pair = getSelectStatementAndPartitionFilterSet(tableDesc.getTableName(), tableDesc.getPartition() + .getExpressionSchema().getFieldsList(), request.getAlgebra()); + + selectStatement = pair.getFirst(); + filterSets = pair.getSecond(); + + conn = getConnection(); + pstmt = conn.prepareStatement(selectStatement); + + // Set table id by force because first parameter of all direct sql is table id + pstmt.setInt(currentIndex, tableId); + currentIndex++; + + for (PartitionFilterSet filter : filterSets) { + // Set table id by force because all filters have table id as first parameter. + pstmt.setInt(currentIndex, tableId); + currentIndex++; + + for (Pair<Type, Object> parameter : filter.getParameters()) { + switch (parameter.getFirst()) { + case BOOLEAN: + pstmt.setBoolean(currentIndex, (Boolean)parameter.getSecond()); + break; + case INT8: + pstmt.setLong(currentIndex, (Long) parameter.getSecond()); + break; + case FLOAT8: + pstmt.setDouble(currentIndex, (Double) parameter.getSecond()); + break; + case DATE: + pstmt.setDate(currentIndex, (Date) parameter.getSecond()); + break; + case TIMESTAMP: + pstmt.setTimestamp(currentIndex, (Timestamp) parameter.getSecond()); + break; + case TIME: + pstmt.setTime(currentIndex, (Time) parameter.getSecond()); + break; + default: + pstmt.setString(currentIndex, (String) parameter.getSecond()); + break; + } + currentIndex++; + } + } + + res = pstmt.executeQuery(); + + while (res.next()) { + PartitionDescProto.Builder builder = PartitionDescProto.newBuilder(); + + builder.setId(res.getInt(COL_PARTITIONS_PK)); + builder.setPartitionName(res.getString("PARTITION_NAME")); + builder.setPath(res.getString("PATH")); + builder.setNumBytes(res.getLong(COL_PARTITION_BYTES)); + + partitions.add(builder.build()); + } + } catch (SQLException se) { + throw new TajoInternalError(se); + } finally { + CatalogUtil.closeQuietly(pstmt, res); + } + + return partitions; + } + + /** + * Create a select statement and parameters for querying partitions and partition keys in CatalogStore. + * + * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3). + * Assume that an user gives a condition WHERE (col1 ='1' or col1 = '100') and col3 > 20. + * There is no filter condition corresponding to col2. + * + * Then, the sql would be generated as following: + * + * SELECT A.PARTITION_ID, A.PARTITION_NAME, A.PATH FROM PARTITIONS A + * WHERE A.TID = ? + * AND A.PARTITION_ID IN ( + * SELECT T1.PARTITION_ID FROM PARTITION_KEYS T1 + * JOIN PARTITION_KEYS T2 ON T1.TID=T2.TID AND T1.PARTITION_ID = T2.PARTITION_ID AND T2.TID = ? + * AND ( T2.COLUMN_NAME = 'col2' AND T2.PARTITION_VALUE IS NOT NULL ) + * JOIN PARTITION_KEYS T3 ON T1.TID=T3.TID AND T1.PARTITION_ID = T3.PARTITION_ID AND T3.TID = ? + * AND ( T3.COLUMN_NAME = 'col3' AND T3.PARTITION_VALUE > ? ) + * WHERE T1.TID = ? AND ( T1.COLUMN_NAME = 'col1' AND T1.PARTITION_VALUE = ? ) + * OR ( T1.COLUMN_NAME = 'col1' AND T1.PARTITION_VALUE = ? ) + ) + * + * @param tableName the table name + * @param partitionColumns list of partition column + * @param json the algebra expression + * @return the select statement and partition filter sets + */ + private Pair<String, List<PartitionFilterSet>> getSelectStatementAndPartitionFilterSet(String tableName, + List<ColumnProto> partitionColumns, String json) { + + Pair<String, List<PartitionFilterSet>> result = null; + Expr[] exprs = null; + + try { + List<PartitionFilterSet> filterSets = TUtil.newList(); + + if (json != null && !json.isEmpty()) { + Expr algebra = JsonHelper.fromJson(json, Expr.class); + exprs = AlgebraicUtil.toConjunctiveNormalFormArray(algebra); + } + + // Write table alias for all levels + String tableAlias; + + PartitionFilterAlgebraVisitor visitor = new PartitionFilterAlgebraVisitor(); + visitor.setIsHiveCatalog(false); + + Expr[] filters = AlgebraicUtil.getRearrangedCNFExpressions(tableName, partitionColumns, exprs); + + StringBuffer sb = new StringBuffer(); + sb.append("\n SELECT A.").append(CatalogConstants.COL_PARTITIONS_PK) + .append(", A.PARTITION_NAME, A.PATH ").append(", ").append(COL_PARTITION_BYTES) + .append(" FROM ").append(CatalogConstants.TB_PARTTIONS).append(" A ") + .append("\n WHERE A.").append(CatalogConstants.COL_TABLES_PK).append(" = ? ") + .append("\n AND A.").append(CatalogConstants.COL_PARTITIONS_PK).append(" IN (") + .append("\n SELECT T1.").append(CatalogConstants.COL_PARTITIONS_PK) + .append(" FROM ").append(CatalogConstants.TB_PARTTION_KEYS).append(" T1 "); + + // Write join clause from second column to last column. + Column target; + + for (int i = 1; i < partitionColumns.size(); i++) { + target = new Column(partitionColumns.get(i)); + tableAlias = "T" + (i+1); + + visitor.setColumn(target); + visitor.setTableAlias(tableAlias); + visitor.visit(null, new Stack<Expr>(), filters[i]); + + sb.append("\n JOIN ").append(CatalogConstants.TB_PARTTION_KEYS).append(" ").append(tableAlias) + .append(" ON T1.").append(CatalogConstants.COL_TABLES_PK).append("=") + .append(tableAlias).append(".").append(CatalogConstants.COL_TABLES_PK) + .append(" AND T1.").append(CatalogConstants.COL_PARTITIONS_PK) + .append(" = ").append(tableAlias).append(".").append(CatalogConstants.COL_PARTITIONS_PK) + .append(" AND ").append(tableAlias).append(".").append(CatalogConstants.COL_TABLES_PK).append(" = ? AND "); + sb.append(visitor.getResult()); + + // Set parameters for executing PrepareStament + PartitionFilterSet filterSet = new PartitionFilterSet(); + filterSet.setColumnName(target.getSimpleName()); + + List<Pair<Type, Object>> list = TUtil.newList(); + list.addAll(visitor.getParameters()); + filterSet.addParameters(list); + + filterSets.add(filterSet); + visitor.clearParameters(); + } + + // Write where clause for first column + target = new Column(partitionColumns.get(0)); + tableAlias = "T1"; + visitor.setColumn(target); + visitor.setTableAlias(tableAlias); + visitor.visit(null, new Stack<Expr>(), filters[0]); + + sb.append("\n WHERE T1.").append(CatalogConstants.COL_TABLES_PK).append(" = ? AND "); + sb.append(visitor.getResult()) + .append("\n )"); + sb.append("\n ORDER BY A.PARTITION_NAME"); + + // Set parameters for executing PrepareStament + PartitionFilterSet filterSet = new PartitionFilterSet(); + filterSet.setColumnName(target.getSimpleName()); + + List<Pair<Type, Object>> list = TUtil.newList(); + list.addAll(visitor.getParameters()); + filterSet.addParameters(list); + + filterSets.add(filterSet); + + result = new Pair<>(sb.toString(), filterSets); + } catch (TajoException e) { + throw new TajoInternalError(e); + } + + return result; + } + + + @Override public List<TablePartitionProto> getAllPartitions() { Connection conn = null; Statement stmt = null; @@ -2323,6 +2579,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo pstmt3.setInt(1, tableId); pstmt3.setString(2, partition.getPartitionName()); pstmt3.setString(3, partition.getPath()); + pstmt3.setLong(4, partition.getNumBytes()); pstmt3.addBatch(); pstmt3.clearParameters(); @@ -2951,4 +3208,33 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo return exist; } + + class PartitionFilterSet { + private String columnName; + private List<Pair<Type, Object>> parameters; + + public PartitionFilterSet() { + parameters = TUtil.newList(); + } + + public String getColumnName() { + return columnName; + } + + public void setColumnName(String columnName) { + this.columnName = columnName; + } + + public List<Pair<Type, Object>> getParameters() { + return parameters; + } + + public void setParameters(List<Pair<Type, Object>> parameters) { + this.parameters = parameters; + } + + public void addParameters(List<Pair<Type, Object>> parameters) { + this.parameters.addAll(parameters); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java index a067a53..5288979 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java @@ -92,18 +92,96 @@ public interface CatalogStore extends Closeable { /************************** PARTITIONS *****************************/ /** + * Check if list of partitions exist on catalog. + * + * @param databaseName + * @param tableName + * @return + * @throws UndefinedDatabaseException + * @throws UndefinedTableException + * @throws UndefinedPartitionMethodException + */ + boolean existPartitions(String databaseName, String tableName) throws + UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException; + + /** * Get all partitions of a table * @param tableName the table name * @return * @throws TajoException */ - List<CatalogProtos.PartitionDescProto> getPartitions(String databaseName, String tableName) throws - UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException; + List<CatalogProtos.PartitionDescProto> getPartitionsOfTable(String databaseName, String tableName) throws + UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + UnsupportedException; CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, - String partitionName) - throws UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionException, - UndefinedPartitionMethodException; + String partitionName) throws UndefinedDatabaseException, + UndefinedTableException, UndefinedPartitionMethodException, UndefinedPartitionException; + + /** + * Get list of partitions matching specified algrbra expression. + * + * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3). + * Assume that an user want to give a condition WHERE (col1 ='1' or col1 = '100') and col3 > 20 . + * + * Then, the algebra expression would be written as following: + * + * { + * "LeftExpr": { + * "LeftExpr": { + * "Qualifier": "default.table1", + * "ColumnName": "col3", + * "OpType": "Column" + * }, + * "RightExpr": { + * "Value": "20.0", + * "ValueType": "Unsigned_Integer", + * "OpType": "Literal" + * }, + * "OpType": "GreaterThan" + * }, + * "RightExpr": { + * "LeftExpr": { + * "LeftExpr": { + * "Qualifier": "default.table1", + * "ColumnName": "col1", + * "OpType": "Column" + * }, + * "RightExpr": { + * "Value": "1", + * "ValueType": "String", + * "OpType": "Literal" + * }, + * "OpType": "Equals" + * }, + * "RightExpr": { + * "LeftExpr": { + * "Qualifier": "default.table1", + * "ColumnName": "col1", + * "OpType": "Column" + * }, + * "RightExpr": { + * "Value": "100", + * "ValueType": "String", + * "OpType": "Literal" + * }, + * "OpType": "Equals" + * }, + * "OpType": "Or" + * }, + * "OpType": "And" + * } + * + * @param request the database name, the table name, the algebra expression + * @return list of PartitionDescProto + * @throws UndefinedDatabaseException + * @throws UndefinedTableException + * @throws UndefinedPartitionMethodException + * @throws UndefinedOperatorException + */ + List<PartitionDescProto> getPartitionsByAlgebra(PartitionsByAlgebraProto request) throws + UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + UndefinedOperatorException, UnsupportedException; List<TablePartitionProto> getAllPartitions(); http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml index 5280127..921d98e 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml @@ -19,6 +19,7 @@ <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 11 - 2015-09-23: Add contents length and file count for partition directory (TAJO-1493) * 10 - 2015-09-22: Well support for self-describing data formats (TAJO-1832) * 9 - 2015-09-12: Allow external catalog store for unit testing (TAJO-1813) * 8 - 2015-09-02: Wrong table type problem in catalog (TAJO-1808) @@ -30,7 +31,7 @@ * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="10"> + <tns:base version="11"> <tns:objects> <tns:Object order="0" type="table" name="META"> <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql> @@ -175,6 +176,7 @@ TID INT NOT NULL REFERENCES TABLES (TID) ON DELETE CASCADE, PARTITION_NAME VARCHAR(767), PATH VARCHAR(1024), + NUM_BYTES BIGINT, CONSTRAINT C_PARTITIONS_PK PRIMARY KEY (PARTITION_ID), CONSTRAINT C_PARTITIONS_UNIQ UNIQUE (TID, PARTITION_NAME) )]]> http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml index 0a7bfa2..a23d393 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml @@ -19,6 +19,7 @@ <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 11 - 2015-09-23: Add contents length and file count for partition directory (TAJO-1493) * 10 - 2015-09-22: Well support for self-describing data formats (TAJO-1832) * 9 - 2015-09-12: Allow external catalog store for unit testing (TAJO-1813) * 8 - 2015-09-02: Wrong table type problem in catalog (TAJO-1808) @@ -30,7 +31,7 @@ * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="10"> + <tns:base version="11"> <tns:objects> <tns:Object order="0" type="table" name="META"> <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql> @@ -152,6 +153,7 @@ TID INT NOT NULL, PARTITION_NAME VARCHAR(255) BINARY, PATH VARCHAR(4096) BINARY, + NUM_BYTES BIGINT, UNIQUE INDEX PARTITION_UNIQUE_IDX (TID, PARTITION_NAME), FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE )]]> http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml index 1113b3e..126a5e4 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml @@ -19,6 +19,7 @@ <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 11 - 2015-09-23: Add contents length and file count for partition directory (TAJO-1493) * 10 - 2015-09-22: Well support for self-describing data formats (TAJO-1832) * 9 - 2015-09-12: Allow external catalog store for unit testing (TAJO-1813) * 8 - 2015-09-02: Wrong table type problem in catalog (TAJO-1808) @@ -30,7 +31,7 @@ * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="10"> + <tns:base version="11"> <tns:objects> <tns:Object order="0" type="table" name="META"> <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql> @@ -156,6 +157,7 @@ TID INT NOT NULL, PARTITION_NAME VARCHAR(255) BINARY, PATH VARCHAR(4096) BINARY, + NUM_BYTES BIGINT, UNIQUE INDEX PARTITION_UNIQUE_IDX (TID, PARTITION_NAME), FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE )]]> http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml index 45d072e..b5396ef 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml @@ -19,6 +19,7 @@ <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 11 - 2015-09-23: Add contents length and file count for partition directory (TAJO-1493) * 10 - 2015-09-22: Well support for self-describing data formats (TAJO-1832) * 9 - 2015-09-12: Allow external catalog store for unit testing (TAJO-1813) * 8 - 2015-09-02: Wrong table type problem in catalog (TAJO-1808) @@ -30,7 +31,7 @@ * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="10"> + <tns:base version="11"> <tns:objects> <tns:Object order="0" type="table" name="meta"> <tns:sql><![CDATA[ @@ -209,6 +210,7 @@ TID INT NOT NULL, PARTITION_NAME VARCHAR2(767), PATH VARCHAR2(4000), + NUM_BYTES NUMBER(38), FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE, CONSTRAINT C_PARTITIONS_UNIQ UNIQUE (TID, PARTITION_NAME) )]]> http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml index 231dc20..033c233 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml @@ -21,6 +21,7 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd "> <!-- Catalog base version history + * 11 - 2015-09-23: Add contents length and file count for partition directory (TAJO-1493) * 10 - 2015-09-22: Well support for self-describing data formats (TAJO-1832) * 9 - 2015-09-12: Allow external catalog store for unit testing (TAJO-1813) * 8 - 2015-09-02: Wrong table type problem in catalog (TAJO-1808) @@ -33,7 +34,7 @@ xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition. * 2 - 2014-06-09: First versioning * 1- Before 2013-03-20 --> - <tns:base version="10"> + <tns:base version="11"> <tns:objects> <tns:Object name="META" type="table" order="0"> <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql> @@ -172,6 +173,7 @@ xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition. PARTITION_NAME VARCHAR(128), PARTITION_VALUE VARCHAR(1024), PATH VARCHAR(4096), + NUM_BYTES BIGINT, FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE, UNIQUE (TID, PARTITION_NAME) )]]> http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/CatalogTestingUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/CatalogTestingUtil.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/CatalogTestingUtil.java index 07130cc..a4dd9c3 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/CatalogTestingUtil.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/CatalogTestingUtil.java @@ -160,8 +160,8 @@ public class CatalogTestingUtil { } PartitionKeyProto.Builder builder = PartitionKeyProto.newBuilder(); - builder.setColumnName(partitionValue); - builder.setPartitionValue(columnName); + builder.setColumnName(columnName); + builder.setPartitionValue(partitionValue); partitionKeyList.add(builder.build()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java index a5e4861..c9dd24f 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java @@ -59,7 +59,7 @@ public class TestCatalog { static CatalogServer server; static CatalogService catalog; - @BeforeClass + @BeforeClass public static void setUp() throws Exception { server = new MiniCatalogServer(); @@ -590,7 +590,7 @@ public class TestCatalog { assertEquals(retrived.getFunctionName(),"test2"); assertEquals(retrived.getLegacyFuncClass(),TestFunc1.class); - assertEquals(retrived.getFuncType(),FunctionType.UDF); + assertEquals(retrived.getFuncType(), FunctionType.UDF); } @Test @@ -785,7 +785,8 @@ public class TestCatalog { .addColumn("age", Type.INT4) .addColumn("score", Type.FLOAT8); - String tableName = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "addedtable"); + String simpleTableName = "addedtable"; + String tableName = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, simpleTableName); KeyValueSet opts = new KeyValueSet(); opts.set("file.delimiter", ","); TableMeta meta = CatalogUtil.newTableMeta("TEXT", opts); @@ -800,7 +801,7 @@ public class TestCatalog { TableDesc desc = new TableDesc(tableName, schema, meta, - new Path(CommonTestingUtil.getTestDir(), "addedtable").toUri()); + new Path(CommonTestingUtil.getTestDir(), simpleTableName).toUri()); desc.setPartitionMethod(partitionMethodDesc); assertFalse(catalog.existsTable(tableName)); catalog.createTable(desc); @@ -815,14 +816,17 @@ public class TestCatalog { testAddPartition(tableName, "id=10/name=aaa"); testAddPartition(tableName, "id=20/name=bbb"); - List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitions(DEFAULT_DATABASE_NAME, "addedtable"); + List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitionsOfTable(DEFAULT_DATABASE_NAME, simpleTableName); assertNotNull(partitions); assertEquals(partitions.size(), 2); + assertEquals(partitions.get(0).getNumBytes(), 0L); + + testGetPartitionsByAlgebra(DEFAULT_DATABASE_NAME, simpleTableName); testDropPartition(tableName, "id=10/name=aaa"); testDropPartition(tableName, "id=20/name=bbb"); - partitions = catalog.getPartitions(DEFAULT_DATABASE_NAME, "addedtable"); + partitions = catalog.getPartitionsOfTable(DEFAULT_DATABASE_NAME, simpleTableName); assertNotNull(partitions); assertEquals(partitions.size(), 0); @@ -830,6 +834,99 @@ public class TestCatalog { assertFalse(catalog.existsTable(tableName)); } + private void testGetPartitionsByAlgebra(String databaseName, String tableName) throws Exception { + String qfTableName = databaseName + "." + tableName; + + // Equals Operator + CatalogProtos.PartitionsByAlgebraProto.Builder request = CatalogProtos.PartitionsByAlgebraProto.newBuilder(); + request.setDatabaseName(databaseName); + request.setTableName(tableName); + + String algebra = "{\n" + + " \"LeftExpr\": {\n" + + " \"LeftExpr\": {\n" + + " \"Qualifier\": \"" + qfTableName + "\",\n" + + " \"ColumnName\": \"id\",\n" + + " \"OpType\": \"Column\"\n" + + " },\n" + + " \"RightExpr\": {\n" + + " \"Value\": \"10\",\n" + + " \"ValueType\": \"Unsigned_Integer\",\n" + + " \"OpType\": \"Literal\"\n" + + " },\n" + + " \"OpType\": \"Equals\"\n" + + " },\n" + + " \"RightExpr\": {\n" + + " \"LeftExpr\": {\n" + + " \"Qualifier\": \"" + qfTableName + "\",\n" + + " \"ColumnName\": \"name\",\n" + + " \"OpType\": \"Column\"\n" + + " },\n" + + " \"RightExpr\": {\n" + + " \"Value\": \"aaa\",\n" + + " \"ValueType\": \"String\",\n" + + " \"OpType\": \"Literal\"\n" + + " },\n" + + " \"OpType\": \"Equals\"\n" + + " },\n" + + " \"OpType\": \"And\"\n" + + "}"; + + request.setAlgebra(algebra); + + List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitionsByAlgebra(request.build()); + assertNotNull(partitions); + assertEquals(1, partitions.size()); + + // GreaterThan Operator and InPredicate Operatior + algebra = "{\n" + + " \"LeftExpr\": {\n" + + " \"LeftExpr\": {\n" + + " \"Qualifier\": \"" + qfTableName + "\",\n" + + " \"ColumnName\": \"id\",\n" + + " \"OpType\": \"Column\"\n" + + " },\n" + + " \"RightExpr\": {\n" + + " \"Value\": \"0\",\n" + + " \"ValueType\": \"Unsigned_Integer\",\n" + + " \"OpType\": \"Literal\"\n" + + " },\n" + + " \"OpType\": \"GreaterThan\"\n" + + " },\n" + + " \"RightExpr\": {\n" + + " \"IsNot\": false,\n" + + " \"LeftExpr\": {\n" + + " \"Qualifier\": \"" + qfTableName + "\",\n" + + " \"ColumnName\": \"name\",\n" + + " \"OpType\": \"Column\"\n" + + " },\n" + + " \"RightExpr\": {\n" + + " \"Values\": [\n" + + " {\n" + + " \"Value\": \"aaa\",\n" + + " \"ValueType\": \"String\",\n" + + " \"OpType\": \"Literal\"\n" + + " },\n" + + " {\n" + + " \"Value\": \"bbb\",\n" + + " \"ValueType\": \"String\",\n" + + " \"OpType\": \"Literal\"\n" + + " }\n" + + " ],\n" + + " \"OpType\": \"ValueList\"\n" + + " },\n" + + " \"OpType\": \"InPredicate\"\n" + + " },\n" + + " \"OpType\": \"And\"\n" + + "}"; + + request.setAlgebra(algebra); + + partitions = catalog.getPartitionsByAlgebra(request.build()); + assertNotNull(partitions); + assertEquals(2, partitions.size()); + } + private void testAddPartition(String tableName, String partitionName) throws Exception { AlterTableDesc alterTableDesc = new AlterTableDesc(); alterTableDesc.setTableName(tableName); http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogAgainstCaseSensitivity.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogAgainstCaseSensitivity.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogAgainstCaseSensitivity.java index 594f5ce..0ca6baa 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogAgainstCaseSensitivity.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogAgainstCaseSensitivity.java @@ -194,7 +194,7 @@ public class TestCatalogAgainstCaseSensitivity { // Test get partitions of a table ////////////////////////////////////////////////////////////////////////////// - List<PartitionDescProto> partitionDescs = catalog.getPartitions("TestDatabase1", "TestPartition1"); + List<PartitionDescProto> partitionDescs = catalog.getPartitionsOfTable("TestDatabase1", "TestPartition1"); assertEquals(2, partitionDescs.size()); Map<String, PartitionDescProto> tablePartitionMap = new HashMap<>(); for (PartitionDescProto eachPartition : partitionDescs) { http://git-wip-us.apache.org/repos/asf/tajo/blob/51bb94d9/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java index a50ce7b..99288e8 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java @@ -175,8 +175,9 @@ public class TajoDump { Collections.sort(tableNames); for (String tableName : tableNames) { try { - TableDesc table = client.getTableDesc(CatalogUtil.buildFQName(databaseName, tableName)); - + String fqName = CatalogUtil.buildFQName(databaseName, tableName); + TableDesc table = client.getTableDesc(fqName); + if (table.getMeta().getStoreType().equalsIgnoreCase("SYSTEM")) { continue; } @@ -186,13 +187,12 @@ public class TajoDump { } else { writer.write(DDLBuilder.buildDDLForBaseTable(table)); } - if (table.hasPartition()) { writer.write("\n\n"); writer.write("--\n"); writer.write(String.format("-- Table Partitions: %s%n", tableName)); writer.write("--\n"); - List<PartitionDescProto> partitionProtos = client.getAllPartitions(tableName); + List<PartitionDescProto> partitionProtos = client.getPartitionsOfTable(fqName); for (PartitionDescProto eachPartitionProto : partitionProtos) { writer.write(DDLBuilder.buildDDLForAddPartition(table, eachPartitionProto)); }
