http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java index 2d35496..2edb4c4 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java @@ -26,14 +26,17 @@ import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.CatalogConstants; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.FunctionDesc; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.exception.*; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto; -import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto; +import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto; import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto; import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto; import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto; @@ -61,7 +64,6 @@ public class MemStore implements CatalogStore { public MemStore(Configuration conf) { } - public void close() throws IOException { databases.clear(); @@ -147,6 +149,8 @@ public class MemStore implements CatalogStore { } databases.put(databaseName, new HashMap<String, CatalogProtos.TableDescProto>()); + indexes.put(databaseName, new HashMap<String, IndexDescProto>()); + indexesByColumn.put(databaseName, new HashMap<String, IndexDescProto>()); } @Override @@ -160,6 +164,8 @@ public class MemStore implements CatalogStore { throw new NoSuchDatabaseException(databaseName); } databases.remove(databaseName); + indexes.remove(databaseName); + indexesByColumn.remove(databaseName); } @Override @@ -539,17 +545,23 @@ public class MemStore implements CatalogStore { @Override public void createIndex(IndexDescProto proto) throws CatalogException { final String databaseName = proto.getTableIdentifier().getDatabaseName(); + final String tableName = CatalogUtil.extractSimpleName(proto.getTableIdentifier().getTableName()); Map<String, IndexDescProto> index = checkAndGetDatabaseNS(indexes, databaseName); Map<String, IndexDescProto> indexByColumn = checkAndGetDatabaseNS(indexesByColumn, databaseName); + TableDescProto tableDescProto = getTable(databaseName, tableName); - if (index.containsKey(proto.getName())) { - throw new AlreadyExistsIndexException(proto.getName()); + if (index.containsKey(proto.getIndexName())) { + throw new AlreadyExistsIndexException(proto.getIndexName()); } - index.put(proto.getName(), proto); - indexByColumn.put(proto.getTableIdentifier().getTableName() + "." - + CatalogUtil.extractSimpleName(proto.getColumn().getName()), proto); + index.put(proto.getIndexName(), proto); + String originalTableName = proto.getTableIdentifier().getTableName(); + String simpleTableName = CatalogUtil.extractSimpleName(originalTableName); + indexByColumn.put(CatalogUtil.buildFQName(proto.getTableIdentifier().getDatabaseName(), + simpleTableName, + getUnifiedNameForIndexByColumn(proto)), + proto); } /* (non-Javadoc) @@ -558,10 +570,19 @@ public class MemStore implements CatalogStore { @Override public void dropIndex(String databaseName, String indexName) throws CatalogException { Map<String, IndexDescProto> index = checkAndGetDatabaseNS(indexes, databaseName); + Map<String, IndexDescProto> indexByColumn = checkAndGetDatabaseNS(indexesByColumn, databaseName); if (!index.containsKey(indexName)) { throw new NoSuchIndexException(indexName); } + IndexDescProto proto = index.get(indexName); + final String tableName = CatalogUtil.extractSimpleName(proto.getTableIdentifier().getTableName()); + TableDescProto tableDescProto = getTable(databaseName, tableName); index.remove(indexName); + String originalTableName = proto.getTableIdentifier().getTableName(); + String simpleTableName = CatalogUtil.extractSimpleName(originalTableName); + indexByColumn.remove(CatalogUtil.buildFQName(proto.getTableIdentifier().getDatabaseName(), + simpleTableName, + getUnifiedNameForIndexByColumn(proto))); } /* (non-Javadoc) @@ -577,19 +598,18 @@ public class MemStore implements CatalogStore { return index.get(indexName); } - /* (non-Javadoc) - * @see CatalogStore#getIndexByName(java.lang.String, java.lang.String) - */ @Override - public IndexDescProto getIndexByColumn(String databaseName, String tableName, String columnName) - throws CatalogException { - + public IndexDescProto getIndexByColumns(String databaseName, String tableName, String[] columnNames) throws CatalogException { Map<String, IndexDescProto> indexByColumn = checkAndGetDatabaseNS(indexesByColumn, databaseName); - if (!indexByColumn.containsKey(columnName)) { - throw new NoSuchIndexException(columnName); + String simpleTableName = CatalogUtil.extractSimpleName(tableName); + TableDescProto tableDescProto = getTable(databaseName, simpleTableName); + String qualifiedColumnName = CatalogUtil.buildFQName(databaseName, simpleTableName, + CatalogUtil.getUnifiedSimpleColumnName(new Schema(tableDescProto.getSchema()), columnNames)); + if (!indexByColumn.containsKey(qualifiedColumnName)) { + throw new NoSuchIndexException(qualifiedColumnName); } - return indexByColumn.get(columnName); + return indexByColumn.get(qualifiedColumnName); } @Override @@ -599,50 +619,47 @@ public class MemStore implements CatalogStore { } @Override - public boolean existIndexByColumn(String databaseName, String tableName, String columnName) - throws CatalogException { + public boolean existIndexByColumns(String databaseName, String tableName, String[] columnNames) throws CatalogException { Map<String, IndexDescProto> indexByColumn = checkAndGetDatabaseNS(indexesByColumn, databaseName); - return indexByColumn.containsKey(columnName); + TableDescProto tableDescProto = getTable(databaseName, tableName); + return indexByColumn.containsKey( + CatalogUtil.buildFQName(databaseName, CatalogUtil.extractSimpleName(tableName), + CatalogUtil.getUnifiedSimpleColumnName(new Schema(tableDescProto.getSchema()), columnNames))); } @Override - public IndexDescProto[] getIndexes(String databaseName, String tableName) throws CatalogException { - List<IndexDescProto> protos = new ArrayList<IndexDescProto>(); + public List<String> getAllIndexNamesByTable(String databaseName, String tableName) throws CatalogException { + List<String> indexNames = new ArrayList<String>(); Map<String, IndexDescProto> indexByColumn = checkAndGetDatabaseNS(indexesByColumn, databaseName); + String simpleTableName = CatalogUtil.extractSimpleName(tableName); for (IndexDescProto proto : indexByColumn.values()) { - if (proto.getTableIdentifier().getTableName().equals(tableName)) { - protos.add(proto); + if (proto.getTableIdentifier().getTableName().equals(simpleTableName)) { + indexNames.add(proto.getIndexName()); } } - return protos.toArray(new IndexDescProto[protos.size()]); + return indexNames; } - + @Override - public List<IndexProto> getAllIndexes() throws CatalogException { - List<IndexProto> indexList = new ArrayList<CatalogProtos.IndexProto>(); - Set<String> databases = indexes.keySet(); - - for (String databaseName: databases) { - Map<String, IndexDescProto> indexMap = indexes.get(databaseName); - - for (String indexName: indexMap.keySet()) { - IndexDescProto indexDesc = indexMap.get(indexName); - IndexProto.Builder builder = IndexProto.newBuilder(); - - builder.setColumnName(indexDesc.getColumn().getName()); - builder.setDataType(indexDesc.getColumn().getDataType().getType().toString()); - builder.setIndexName(indexName); - builder.setIndexType(indexDesc.getIndexMethod().toString()); - builder.setIsAscending(indexDesc.hasIsAscending() && indexDesc.getIsAscending()); - builder.setIsClustered(indexDesc.hasIsClustered() && indexDesc.getIsClustered()); - builder.setIsUnique(indexDesc.hasIsUnique() && indexDesc.getIsUnique()); - - indexList.add(builder.build()); + public boolean existIndexesByTable(String databaseName, String tableName) throws CatalogException { + Map<String, IndexDescProto> indexByColumn = checkAndGetDatabaseNS(indexesByColumn, databaseName); + String simpleTableName = CatalogUtil.extractSimpleName(tableName); + for (IndexDescProto proto : indexByColumn.values()) { + if (proto.getTableIdentifier().getTableName().equals(simpleTableName)) { + return true; } } - - return indexList; + return false; + } + + @Override + public List<IndexDescProto> getAllIndexes() throws CatalogException { + List<IndexDescProto> indexDescProtos = TUtil.newList(); + for (Map<String,IndexDescProto> indexMap : indexes.values()) { + indexDescProtos.addAll(indexMap.values()); + } + return indexDescProtos; } @Override @@ -666,4 +683,13 @@ public class MemStore implements CatalogStore { return null; } + public static String getUnifiedNameForIndexByColumn(IndexDescProto proto) { + StringBuilder sb = new StringBuilder(); + for (SortSpecProto columnSpec : proto.getKeySortSpecsList()) { + String[] identifiers = columnSpec.getColumn().getName().split(CatalogConstants.IDENTIFIER_DELIMITER_REGEXP); + sb.append(identifiers[identifiers.length-1]).append("_"); + } + sb.deleteCharAt(sb.length()-1); + return sb.toString(); + } }
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml index b0ba3b9..5e19ac1 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml @@ -97,17 +97,20 @@ <tns:Object order="11" type="table" name="INDEXES"> <tns:sql><![CDATA[ CREATE TABLE INDEXES ( - DB_ID INT NOT NULL REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE, - TID INT NOT NULL REFERENCES TABLES (TID) ON DELETE CASCADE, - INDEX_NAME VARCHAR(128) NOT NULL, - COLUMN_NAME VARCHAR(128) NOT NULL, - DATA_TYPE VARCHAR(128) NOT NULL, - INDEX_TYPE CHAR(32) NOT NULL, - PATH VARCHAR(4096), - IS_UNIQUE BOOLEAN NOT NULL, - IS_CLUSTERED BOOLEAN NOT NULL, - IS_ASCENDING BOOLEAN NOT NULL, - CONSTRAINT C_INDEXES_PK PRIMARY KEY (DB_ID, INDEX_NAME) + INDEX_ID INT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), + DB_ID INT NOT NULL REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE, + TID INT NOT NULL REFERENCES TABLES (TID) ON DELETE CASCADE, + INDEX_NAME VARCHAR(128) NOT NULL, + INDEX_TYPE CHAR(32) NOT NULL, + PATH VARCHAR(4096), + COLUMN_NAMES VARCHAR(128) NOT NULL, -- array of column names + DATA_TYPES VARCHAR(128) NOT NULL, -- array of column types + ORDERS VARCHAR(128) NOT NULL, -- array of column orders + NULL_ORDERS VARCHAR(128) NOT NULL, -- array of null orderings + IS_UNIQUE BOOLEAN NOT NULL, + IS_CLUSTERED BOOLEAN NOT NULL, + CONSTRAINT INDEXES_PK PRIMARY KEY (INDEX_ID), + CONSTRAINT C_INDEXES_UNIQ UNIQUE (DB_ID, INDEX_NAME) )]]> </tns:sql> </tns:Object> @@ -115,7 +118,7 @@ <tns:sql><![CDATA[CREATE UNIQUE INDEX idx_indexes_pk ON INDEXES (DB_ID,index_name)]]></tns:sql> </tns:Object> <tns:Object order="13" type="index" name="IDX_INDEXES_COLUMNS" dependsOn="INDEXES"> - <tns:sql><![CDATA[CREATE INDEX idx_indexes_columns ON INDEXES (TID,column_name)]]></tns:sql> + <tns:sql><![CDATA[CREATE INDEX idx_col_names ON INDEXES (DB_ID,TID,column_names)]]></tns:sql> </tns:Object> <tns:Object order="14" type="table" name="STATS"> <tns:sql><![CDATA[ http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/indexes.sql ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/indexes.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/indexes.sql index 33bf0f5..9c7f8ba 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/indexes.sql +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/indexes.sql @@ -1,17 +1,19 @@ CREATE TABLE INDEXES ( + INDEX_ID INT NOT NULL AUTO_INCREMENT PRIMARY KEY, DB_ID INT NOT NULL, TID INT NOT NULL, INDEX_NAME VARCHAR(128) NOT NULL, - COLUMN_NAME VARCHAR(128) NOT NULL, - DATA_TYPE VARCHAR(128) NOT NULL, INDEX_TYPE CHAR(32) NOT NULL, - PATH VARCHAR(4096), + PATH VARCHAR(4096) NOT NULL, + COLUMN_NAMES VARCHAR(256) NOT NULL, -- array of column names + DATA_TYPES VARCHAR(128) NOT NULL, -- array of column types + ORDERS VARCHAR(128) NOT NULL, -- array of column orders + NULL_ORDERS VARCHAR(128) NOT NULL, -- array of null orderings IS_UNIQUE BOOLEAN NOT NULL, IS_CLUSTERED BOOLEAN NOT NULL, - IS_ASCENDING BOOLEAN NOT NULL, - PRIMARY KEY (DB_ID, INDEX_NAME), + PRIMARY KEY (INDEX_ID), FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE, FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE, UNIQUE INDEX IDX_DB_ID_NAME (DB_ID, INDEX_NAME), - INDEX IDX_TID_COLUMN_NAME (TID, COLUMN_NAME) + INDEX IDX_COL_NAMES (DB_ID, TID, COLUMN_NAMES) ) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql index 33bf0f5..9c7f8ba 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql @@ -1,17 +1,19 @@ CREATE TABLE INDEXES ( + INDEX_ID INT NOT NULL AUTO_INCREMENT PRIMARY KEY, DB_ID INT NOT NULL, TID INT NOT NULL, INDEX_NAME VARCHAR(128) NOT NULL, - COLUMN_NAME VARCHAR(128) NOT NULL, - DATA_TYPE VARCHAR(128) NOT NULL, INDEX_TYPE CHAR(32) NOT NULL, - PATH VARCHAR(4096), + PATH VARCHAR(4096) NOT NULL, + COLUMN_NAMES VARCHAR(256) NOT NULL, -- array of column names + DATA_TYPES VARCHAR(128) NOT NULL, -- array of column types + ORDERS VARCHAR(128) NOT NULL, -- array of column orders + NULL_ORDERS VARCHAR(128) NOT NULL, -- array of null orderings IS_UNIQUE BOOLEAN NOT NULL, IS_CLUSTERED BOOLEAN NOT NULL, - IS_ASCENDING BOOLEAN NOT NULL, - PRIMARY KEY (DB_ID, INDEX_NAME), + PRIMARY KEY (INDEX_ID), FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE, FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE, UNIQUE INDEX IDX_DB_ID_NAME (DB_ID, INDEX_NAME), - INDEX IDX_TID_COLUMN_NAME (TID, COLUMN_NAME) + INDEX IDX_COL_NAMES (DB_ID, TID, COLUMN_NAMES) ) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/indexes.sql ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/indexes.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/indexes.sql index ae31d6f..d416006 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/indexes.sql +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/indexes.sql @@ -1,14 +1,18 @@ CREATE TABLE INDEXES ( + INDEX_ID NUMBER(10) PRIMARY KEY, DB_ID INT NOT NULL, TID INT NOT NULL, INDEX_NAME VARCHAR2(128) NOT NULL, - COLUMN_NAME VARCHAR2(128) NOT NULL, - DATA_TYPE VARCHAR2(128) NOT NULL, INDEX_TYPE CHAR(32) NOT NULL, - IS_UNIQUE CHAR NOT NULL, - IS_CLUSTERED CHAR NOT NULL, - IS_ASCENDING CHAR NOT NULL, - CONSTRAINT INDEXES_PKEY PRIMARY KEY (DB_ID, INDEX_NAME), + PATH VARCHAR(4096) NOT NULL, + COLUMN_NAMES VARCHAR(256) NOT NULL, -- array of column names + DATA_TYPES VARCHAR(128) NOT NULL, -- array of column types + ORDERS VARCHAR(128) NOT NULL, -- array of column orders + NULL_ORDERS VARCHAR(128) NOT NULL, -- array of null orderings + IS_UNIQUE BOOLEAN NOT NULL, + IS_CLUSTERED BOOLEAN NOT NULL, FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE, - FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE + FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE, + UNIQUE INDEX IDX_DB_ID_NAME (DB_ID, INDEX_NAME), + INDEX IDX_COL_NAMES (DB_ID, TID, COLUMN_NAMES) ) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml index 8e5cbcc..f6b2b0d 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml @@ -97,18 +97,21 @@ xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition. <tns:Object name="INDEXES" type="table" order="10"> <tns:sql><![CDATA[ CREATE TABLE INDEXES ( - DB_ID INT NOT NULL, - TID INT NOT NULL, - INDEX_NAME VARCHAR(128) NOT NULL, - COLUMN_NAME VARCHAR(128) NOT NULL, - DATA_TYPE VARCHAR(128) NOT NULL, - INDEX_TYPE CHAR(32) NOT NULL, - IS_UNIQUE BOOLEAN NOT NULL, - IS_CLUSTERED BOOLEAN NOT NULL, - IS_ASCENDING BOOLEAN NOT NULL, - CONSTRAINT INDEXES_PKEY PRIMARY KEY (DB_ID, INDEX_NAME), - FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE, - FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE + INDEX_ID SERIAL NOT NULL PRIMARY KEY, + DB_ID INT NOT NULL, + TID INT NOT NULL, + INDEX_NAME VARCHAR(128) NOT NULL, + INDEX_TYPE CHAR(32) NOT NULL, + PATH VARCHAR(4096) NOT NULL, + COLUMN_NAMES VARCHAR(256) NOT NULL, -- array of column names + DATA_TYPES VARCHAR(128) NOT NULL, -- array of column types + ORDERS VARCHAR(128) NOT NULL, -- array of column orders + NULL_ORDERS VARCHAR(128) NOT NULL, -- array of null orderings + IS_UNIQUE BOOLEAN NOT NULL, + IS_CLUSTERED BOOLEAN NOT NULL, + CONSTRAINT INDEXES_PKEY PRIMARY KEY (INDEX_ID), + FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE, + FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE )]]> </tns:sql> </tns:Object> @@ -116,7 +119,7 @@ xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition. <tns:sql><![CDATA[CREATE UNIQUE INDEX INDEXES_IDX_DB_ID_NAME on INDEXES (DB_ID, INDEX_NAME)]]></tns:sql> </tns:Object> <tns:Object name="INDEXES_IDX_TID_COLUMN_NAME" type="index" order="12" dependsOn="INDEXES"> - <tns:sql><![CDATA[CREATE INDEX INDEXES_IDX_TID_COLUMN_NAME on INDEXES (TID, COLUMN_NAME)]]></tns:sql> + <tns:sql><![CDATA[CREATE INDEX INDEXES_IDX_TID_COLUMN_NAME on INDEXES (DB_ID, TID, COLUMN_NAMES)]]></tns:sql> </tns:Object> <tns:Object name="STATS" type="table" order="13"> <tns:sql><![CDATA[ http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java index e0b049c..044ef61 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java @@ -25,20 +25,16 @@ import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary; import org.apache.tajo.catalog.exception.CatalogException; import org.apache.tajo.catalog.exception.NoSuchFunctionException; -import org.apache.tajo.catalog.store.PostgreSQLStore; -import org.apache.tajo.function.Function; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType; import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.store.DerbyStore; -import org.apache.tajo.catalog.store.MySQLStore; -import org.apache.tajo.catalog.store.MariaDBStore; -import org.apache.tajo.catalog.store.OracleStore; +import org.apache.tajo.catalog.store.*; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.function.Function; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; @@ -47,6 +43,8 @@ import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.*; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; @@ -383,36 +381,43 @@ public class TestCatalog { static IndexDesc desc1; static IndexDesc desc2; static IndexDesc desc3; - - static { - desc1 = new IndexDesc( - "idx_test", new Path("idx_test"), DEFAULT_DATABASE_NAME, "indexed", new Column("id", Type.INT4), - IndexMethod.TWO_LEVEL_BIN_TREE, true, true, true); - - desc2 = new IndexDesc( - "idx_test2", new Path("idx_test2"), DEFAULT_DATABASE_NAME, "indexed", new Column("score", Type.FLOAT8), - IndexMethod.TWO_LEVEL_BIN_TREE, false, false, false); - - desc3 = new IndexDesc( - "idx_test", new Path("idx_test"), DEFAULT_DATABASE_NAME, "indexed", new Column("id", Type.INT4), - IndexMethod.TWO_LEVEL_BIN_TREE, true, true, true); - } + static Schema relationSchema; public static TableDesc prepareTable() throws IOException { - Schema schema = new Schema(); - schema.addColumn("indexed.id", Type.INT4) - .addColumn("indexed.name", Type.TEXT) - .addColumn("indexed.age", Type.INT4) - .addColumn("indexed.score", Type.FLOAT8); + relationSchema = new Schema(); + relationSchema.addColumn(DEFAULT_DATABASE_NAME + ".indexed.id", Type.INT4) + .addColumn(DEFAULT_DATABASE_NAME + ".indexed.name", Type.TEXT) + .addColumn(DEFAULT_DATABASE_NAME + ".indexed.age", Type.INT4) + .addColumn(DEFAULT_DATABASE_NAME + ".indexed.score", Type.FLOAT8); String tableName = "indexed"; TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); return new TableDesc( - CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, tableName), schema, meta, + CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, tableName), relationSchema, meta, new Path(CommonTestingUtil.getTestDir(), "indexed").toUri()); } + public static void prepareIndexDescs() throws IOException, URISyntaxException { + SortSpec[] colSpecs1 = new SortSpec[1]; + colSpecs1[0] = new SortSpec(new Column("default.indexed.id", Type.INT4), true, true); + desc1 = new IndexDesc(DEFAULT_DATABASE_NAME, "indexed", + "idx_test", new URI("idx_test"), colSpecs1, + IndexMethod.TWO_LEVEL_BIN_TREE, true, true, relationSchema); + + SortSpec[] colSpecs2 = new SortSpec[1]; + colSpecs2[0] = new SortSpec(new Column("default.indexed.score", Type.FLOAT8), false, false); + desc2 = new IndexDesc(DEFAULT_DATABASE_NAME, "indexed", + "idx_test2", new URI("idx_test2"), colSpecs2, + IndexMethod.TWO_LEVEL_BIN_TREE, false, false, relationSchema); + + SortSpec[] colSpecs3 = new SortSpec[1]; + colSpecs3[0] = new SortSpec(new Column("default.indexed.id", Type.INT4), true, false); + desc3 = new IndexDesc(DEFAULT_DATABASE_NAME, "indexed", + "idx_test", new URI("idx_test"), colSpecs3, + IndexMethod.TWO_LEVEL_BIN_TREE, true, true, relationSchema); + } + @Test public void testCreateSameTables() throws IOException { assertTrue(catalog.createDatabase("tmpdb3", TajoConstants.DEFAULT_TABLESPACE_NAME)); @@ -448,20 +453,29 @@ public class TestCatalog { @Test public void testAddAndDelIndex() throws Exception { TableDesc desc = prepareTable(); + prepareIndexDescs(); assertTrue(catalog.createTable(desc)); - assertFalse(catalog.existIndexByName("db1", desc1.getName())); - assertFalse(catalog.existIndexByColumn(DEFAULT_DATABASE_NAME, "indexed", "id")); + assertFalse(catalog.existIndexByName(DEFAULT_DATABASE_NAME, desc1.getName())); + assertFalse(catalog.existIndexByColumnNames(DEFAULT_DATABASE_NAME, "indexed", new String[]{"id"})); catalog.createIndex(desc1); assertTrue(catalog.existIndexByName(DEFAULT_DATABASE_NAME, desc1.getName())); - assertTrue(catalog.existIndexByColumn(DEFAULT_DATABASE_NAME, "indexed", "id")); + assertTrue(catalog.existIndexByColumnNames(DEFAULT_DATABASE_NAME, "indexed", new String[]{"id"})); assertFalse(catalog.existIndexByName(DEFAULT_DATABASE_NAME, desc2.getName())); - assertFalse(catalog.existIndexByColumn(DEFAULT_DATABASE_NAME, "indexed", "score")); + assertFalse(catalog.existIndexByColumnNames(DEFAULT_DATABASE_NAME, "indexed", new String[]{"score"})); catalog.createIndex(desc2); assertTrue(catalog.existIndexByName(DEFAULT_DATABASE_NAME, desc2.getName())); - assertTrue(catalog.existIndexByColumn(DEFAULT_DATABASE_NAME, "indexed", "score")); + assertTrue(catalog.existIndexByColumnNames(DEFAULT_DATABASE_NAME, "indexed", new String[]{"score"})); + + Set<IndexDesc> indexDescs = TUtil.newHashSet(); + indexDescs.add(desc1); + indexDescs.add(desc2); + indexDescs.add(desc3); + for (IndexDesc index : catalog.getAllIndexesByTable(DEFAULT_DATABASE_NAME, "indexed")) { + assertTrue(indexDescs.contains(index)); + } catalog.dropIndex(DEFAULT_DATABASE_NAME, desc1.getName()); assertFalse(catalog.existIndexByName(DEFAULT_DATABASE_NAME, desc1.getName())); http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index db7f981..5ecb161 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -497,7 +497,7 @@ public class TajoCli { if (response == null) { displayFormatter.printErrorMessage(sout, "response is null"); wasError = true; - } else if (response.getResultCode() == ClientProtos.ResultCode.OK) { + } else if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { if (response.getIsForwarded()) { QueryId queryId = new QueryId(response.getQueryId()); waitForQueryCompleted(queryId); @@ -510,8 +510,8 @@ public class TajoCli { } } } else { - if (response.hasErrorMessage()) { - displayFormatter.printErrorMessage(sout, response.getErrorMessage()); + if (response.getResult().hasErrorMessage()) { + displayFormatter.printErrorMessage(sout, response.getResult().getErrorMessage()); wasError = true; } } @@ -534,7 +534,7 @@ public class TajoCli { if (response == null) { displayFormatter.printErrorMessage(sout, "response is null"); wasError = true; - } else if (response.getResultCode() == ClientProtos.ResultCode.OK) { + } else if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { if (response.getIsForwarded()) { QueryId queryId = new QueryId(response.getQueryId()); waitForQueryCompleted(queryId); @@ -546,8 +546,8 @@ public class TajoCli { } } } else { - if (response.hasErrorMessage()) { - displayFormatter.printErrorMessage(sout, response.getErrorMessage()); + if (response.getResult().hasErrorMessage()) { + displayFormatter.printErrorMessage(sout, response.getResult().getErrorMessage()); wasError = true; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java index 5eebc2b..2915612 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java @@ -21,9 +21,12 @@ package org.apache.tajo.cli.tsql.commands; import org.apache.commons.lang.CharUtils; import org.apache.commons.lang.StringEscapeUtils; import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto; +import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto; import org.apache.tajo.cli.tsql.TajoCli; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; @@ -51,6 +54,22 @@ public class DescTableCommand extends TajoShellCommand { context.getOutput().println("Did not find any relation named \"" + tableName + "\""); } else { context.getOutput().println(toFormattedString(desc)); + // If there exists any indexes for the table, print index information + if (client.hasIndexes(tableName)) { + StringBuilder sb = new StringBuilder(); + sb.append("Indexes:\n"); + for (IndexDescProto index : client.getIndexes(tableName)) { + sb.append("\"").append(index.getIndexName()).append("\" "); + sb.append(index.getIndexMethod()).append(" ("); + for (SortSpecProto key : index.getKeySortSpecsList()) { + sb.append(CatalogUtil.extractSimpleName(key.getColumn().getName())); + sb.append(key.getAscending() ? " ASC" : " DESC"); + sb.append(key.getNullFirst() ? " NULLS FIRST, " : " NULLS LAST, "); + } + sb.delete(sb.length()-2, sb.length()-1).append(")\n"); + } + context.getOutput().println(sb.toString()); + } } } else if (cmd.length == 1) { List<String> tableList = client.getTableList(null); http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java index a36fc0e..652008c 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java @@ -21,11 +21,13 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; import org.apache.hadoop.fs.Path; import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.catalog.IndexMeta; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto; import java.io.Closeable; import java.sql.SQLException; @@ -134,4 +136,18 @@ public interface CatalogAdminClient extends Closeable { public TableDesc getTableDesc(final String tableName) throws ServiceException; public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException; + + public IndexDescProto getIndex(final String indexName) throws ServiceException; + + public boolean existIndex(final String indexName) throws ServiceException; + + public List<IndexDescProto> getIndexes(final String tableName) throws ServiceException; + + public boolean hasIndexes(final String tableName) throws ServiceException; + + public IndexDescProto getIndex(final String tableName, final String[] columnNames) throws ServiceException; + + public boolean existIndex(final String tableName, final String[] columnName) throws ServiceException; + + public boolean dropIndex(final String indexName) throws ServiceException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java index 496161d..17fdb25 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java @@ -21,13 +21,12 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; import org.apache.hadoop.fs.Path; import org.apache.tajo.annotation.Nullable; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto; import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.jdbc.SQLStates; import org.apache.tajo.rpc.NettyClientBase; @@ -37,7 +36,6 @@ import java.io.IOException; import java.sql.SQLException; import java.util.List; -import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService; import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService.BlockingInterface; public class CatalogAdminClientImpl implements CatalogAdminClient { @@ -152,10 +150,10 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { builder.setPartition(partitionMethodDesc.getProto()); } ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build()); - if (res.getResultCode() == ClientProtos.ResultCode.OK) { + if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) { return CatalogUtil.newTableDesc(res.getTableDesc()); } else { - throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); + throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); } } @@ -226,10 +224,10 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { builder.setSessionId(connection.sessionId); builder.setTableName(tableName); ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build()); - if (res.getResultCode() == ClientProtos.ResultCode.OK) { + if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) { return CatalogUtil.newTableDesc(res.getTableDesc()); } else { - throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); + throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); } } @@ -250,10 +248,10 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { String paramFunctionName = functionName == null ? "" : functionName; ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null, connection.convertSessionedString(paramFunctionName)); - if (res.getResultCode() == ClientProtos.ResultCode.OK) { + if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) { return res.getFunctionsList(); } else { - throw new SQLException(res.getErrorMessage()); + throw new SQLException(res.getResult().getErrorMessage()); } } @@ -261,6 +259,124 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { } @Override + public IndexDescProto getIndex(final String indexName) throws ServiceException { + return new ServerCallable<IndexDescProto>(connection.connPool, + connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + @Override + public IndexDescProto call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.getIndexWithName(null, + connection.convertSessionedString(indexName)); + } + }.withRetries(); + } + + @Override + public boolean existIndex(final String indexName) throws ServiceException { + return new ServerCallable<Boolean>(connection.connPool, + connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + @Override + public Boolean call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.existIndexWithName(null, + connection.convertSessionedString(indexName)).getValue(); + } + }.withRetries(); + } + + @Override + public List<IndexDescProto> getIndexes(final String tableName) throws ServiceException { + return new ServerCallable<List<IndexDescProto>>(connection.connPool, + connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + @Override + public List<IndexDescProto> call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + GetIndexesResponse response = tajoMasterService.getIndexesForTable(null, + connection.convertSessionedString(tableName)); + if (response.getResult().getResultCode() == ResultCode.OK) { + return response.getIndexesList(); + } else { + throw new SQLException(response.getResult().getErrorMessage()); + } + } + }.withRetries(); + } + + @Override + public boolean hasIndexes(final String tableName) throws ServiceException { + return new ServerCallable<Boolean>(connection.connPool, + connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + @Override + public Boolean call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.existIndexesForTable(null, + connection.convertSessionedString(tableName)).getValue(); + } + }.withRetries(); + } + + @Override + public IndexDescProto getIndex(final String tableName, final String[] columnNames) throws ServiceException { + return new ServerCallable<IndexDescProto>(connection.connPool, + connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + @Override + public IndexDescProto call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setTableName(tableName); + for (String eachColumnName : columnNames) { + builder.addColumnNames(eachColumnName); + } + GetIndexWithColumnsResponse response = tajoMasterService.getIndexWithColumns(null, builder.build()); + if (response.getResult().getResultCode() == ResultCode.OK) { + return response.getIndexDesc(); + } else { + throw new SQLException(response.getResult().getErrorMessage()); + } + } + }.withRetries(); + } + + @Override + public boolean existIndex(final String tableName, final String[] columnName) throws ServiceException { + return new ServerCallable<Boolean>(connection.connPool, + connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + @Override + public Boolean call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setTableName(tableName); + for (String eachColumnName : columnName) { + builder.addColumnNames(eachColumnName); + } + return tajoMasterService.existIndexWithColumns(null, builder.build()).getValue(); + } + }.withRetries(); + } + + @Override + public boolean dropIndex(final String indexName) throws ServiceException { + return new ServerCallable<Boolean>(connection.connPool, + connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + @Override + public Boolean call(NettyClientBase client) throws Exception { + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.dropIndex(null, + connection.convertSessionedString(indexName)).getValue(); + } + }.withRetries(); + } + + @Override public void close() throws IOException { } } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index bab3518..c643679 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -170,7 +170,7 @@ public class QueryClientImpl implements QueryClient { SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build()); - if (response.getResultCode() == ResultCode.OK) { + if (response.getResult().getResultCode() == ResultCode.OK) { connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); } return response; @@ -205,11 +205,11 @@ public class QueryClientImpl implements QueryClient { ClientProtos.SubmitQueryResponse response = executeQuery(sql); - if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { - if (response.hasErrorMessage()) { - throw new ServiceException(response.getErrorMessage()); - } else if (response.hasErrorTrace()) { - throw new ServiceException(response.getErrorTrace()); + if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) { + if (response.getResult().hasErrorMessage()) { + throw new ServiceException(response.getResult().getErrorMessage()); + } else if (response.getResult().hasErrorTrace()) { + throw new ServiceException(response.getResult().getErrorTrace()); } } @@ -241,8 +241,8 @@ public class QueryClientImpl implements QueryClient { ClientProtos.SubmitQueryResponse response = executeQueryWithJson(json); - if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { - throw new ServiceException(response.getErrorTrace()); + if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) { + throw new ServiceException(response.getResult().getErrorTrace()); } QueryId queryId = new QueryId(response.getQueryId()); @@ -391,8 +391,8 @@ public class QueryClientImpl implements QueryClient { builder.setFetchRowNum(fetchRowNum); try { GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build()); - if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { - throw new ServiceException(response.getErrorTrace()); + if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) { + throw new ServiceException(response.getResult().getErrorTrace()); } return response.getResultSet(); @@ -434,12 +434,12 @@ public class QueryClientImpl implements QueryClient { builder.setIsJson(false); ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); - if (response.getResultCode() == ClientProtos.ResultCode.OK) { + if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); return true; } else { - if (response.hasErrorMessage()) { - System.err.println("ERROR: " + response.getErrorMessage()); + if (response.getResult().hasErrorMessage()) { + System.err.println("ERROR: " + response.getResult().getErrorMessage()); } return false; } @@ -463,11 +463,11 @@ public class QueryClientImpl implements QueryClient { builder.setQuery(json); builder.setIsJson(true); ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); - if (response.getResultCode() == ClientProtos.ResultCode.OK) { + if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { return true; } else { - if (response.hasErrorMessage()) { - System.err.println("ERROR: " + response.getErrorMessage()); + if (response.getResult().hasErrorMessage()) { + System.err.println("ERROR: " + response.getResult().getErrorMessage()); } return false; } @@ -588,11 +588,11 @@ public class QueryClientImpl implements QueryClient { TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build()); - if (res.getResultCode() == ResultCode.OK) { + if (res.getResult().getResultCode() == ResultCode.OK) { return res.getQueryInfo(); } else { abort(); - throw new ServiceException(res.getErrorMessage()); + throw new ServiceException(res.getResult().getErrorMessage()); } } }.withRetries(); @@ -618,11 +618,11 @@ public class QueryClientImpl implements QueryClient { QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub(); GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build()); - if (res.getResultCode() == ResultCode.OK) { + if (res.getResult().getResultCode() == ResultCode.OK) { return res.getQueryHistory(); } else { abort(); - throw new ServiceException(res.getErrorMessage()); + throw new ServiceException(res.getResult().getErrorMessage()); } } }.withRetries(); http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java index 4a38934..486f95c 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java @@ -41,11 +41,11 @@ public class QueryStatus { submitTime = proto.getSubmitTime(); finishTime = proto.getFinishTime(); hasResult = proto.getHasResult(); - if (proto.hasErrorMessage()) { - errorText = proto.getErrorMessage(); + if (proto.getResult().hasErrorMessage()) { + errorText = proto.getResult().getErrorMessage(); } - if (proto.hasErrorTrace()) { - errorTrace = proto.getErrorTrace(); + if (proto.getResult().hasErrorTrace()) { + errorTrace = proto.getResult().getErrorTrace(); } queryMasterHost = proto.getQueryMasterHost(); http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 1bc8050..0f82ae8 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -184,11 +184,11 @@ public class SessionConnection implements Closeable { SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request); - if (response.getResultCode() == ResultCode.OK) { + if (response.getResult().getResultCode() == ResultCode.OK) { updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); return Collections.unmodifiableMap(sessionVarsCache); } else { - throw new ServiceException(response.getMessage()); + throw new ServiceException(response.getResult().getErrorMessage()); } } }.withRetries(); @@ -207,11 +207,11 @@ public class SessionConnection implements Closeable { SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request); - if (response.getResultCode() == ResultCode.OK) { + if (response.getResult().getResultCode() == ResultCode.OK) { updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); return Collections.unmodifiableMap(sessionVarsCache); } else { - throw new ServiceException(response.getMessage()); + throw new ServiceException(response.getResult().getErrorMessage()); } } }.withRetries(); @@ -334,7 +334,7 @@ public class SessionConnection implements Closeable { CreateSessionResponse response = tajoMasterService.createSession(null, builder.build()); - if (response.getResultCode() == ResultCode.OK) { + if (response.getResult().getResultCode() == ResultCode.OK) { sessionId = response.getSessionId(); updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); @@ -343,7 +343,7 @@ public class SessionConnection implements Closeable { } } else { - throw new InvalidClientSessionException(response.getMessage()); + throw new InvalidClientSessionException(response.getResult().getErrorMessage()); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java index 8eafc91..9522746 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java @@ -26,11 +26,13 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.QueryId; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.annotation.ThreadSafe; +import org.apache.tajo.catalog.IndexMeta; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.ha.HAServiceUtil; @@ -237,4 +239,39 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException { return catalogClient.getFunctions(functionName); } + + @Override + public IndexDescProto getIndex(String indexName) throws ServiceException { + return catalogClient.getIndex(indexName); + } + + @Override + public boolean existIndex(String indexName) throws ServiceException { + return catalogClient.existIndex(indexName); + } + + @Override + public List<IndexDescProto> getIndexes(String tableName) throws ServiceException { + return catalogClient.getIndexes(tableName); + } + + @Override + public boolean hasIndexes(String tableName) throws ServiceException { + return catalogClient.hasIndexes(tableName); + } + + @Override + public IndexDescProto getIndex(String tableName, String[] columnNames) throws ServiceException { + return catalogClient.getIndex(tableName, columnNames); + } + + @Override + public boolean existIndex(String tableName, String[] columnName) throws ServiceException { + return catalogClient.existIndex(tableName, columnName); + } + + @Override + public boolean dropIndex(String indexName) throws ServiceException { + return catalogClient.dropIndex(indexName); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/proto/ClientProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index a9f5498..0454901 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -31,16 +31,21 @@ enum ResultCode { ERROR = 1; } +message RequestResult { + required ResultCode resultCode = 1; + optional string errorMessage = 2; + optional string errorTrace = 3; +} + message CreateSessionRequest { required string username = 1; optional string baseDatabaseName = 2; } message CreateSessionResponse { - required ResultCode resultCode = 1; + required RequestResult result = 1; optional SessionIdProto sessionId = 2; optional KeyValueSetProto sessionVars = 3; - optional string message = 4; } message UpdateSessionVariableRequest { @@ -50,9 +55,8 @@ message UpdateSessionVariableRequest { } message SessionUpdateResponse { - required ResultCode resultCode = 1; + required RequestResult result = 1; optional KeyValueSetProto sessionVars = 2; - optional string message = 3; } message SessionedStringProto { @@ -61,9 +65,8 @@ message SessionedStringProto { } message ExplainQueryResponse { - required ResultCode resultCode = 1; + required RequestResult result = 1; optional string explain = 2; - optional string errorMessage = 3; } message QueryRequest { @@ -74,9 +77,8 @@ message QueryRequest { } message UpdateQueryResponse { - required ResultCode resultCode = 1; - optional string errorMessage = 2; - optional KeyValueSetProto sessionVars = 3; + required RequestResult result = 1; + optional KeyValueSetProto sessionVars = 2; } message GetQueryResultRequest { @@ -126,7 +128,7 @@ message SerializedResultSet { } message SubmitQueryResponse { - required ResultCode resultCode = 1; + required RequestResult result = 1; required QueryIdProto queryId = 2; required string userName = 3; optional bool isForwarded = 4 [default = false]; @@ -138,24 +140,19 @@ message SubmitQueryResponse { optional TableDescProto tableDesc = 8; optional int32 maxRowNum = 9; - optional string errorMessage = 10; - optional string errorTrace = 11; - - optional KeyValueSetProto sessionVars = 12; + optional KeyValueSetProto sessionVars = 10; } message GetQueryStatusResponse { - required ResultCode resultCode = 1; + required RequestResult result = 1; required QueryIdProto queryId = 2; optional QueryState state = 3; optional float progress = 4; optional int64 submitTime = 5; optional int64 finishTime = 7; optional bool hasResult = 8; - optional string errorMessage = 9; - optional string errorTrace = 10; - optional string queryMasterHost = 11; - optional int32 queryMasterPort = 12; + optional string queryMasterHost = 9; + optional int32 queryMasterPort = 10; } message GetQueryResultDataRequest { @@ -165,10 +162,8 @@ message GetQueryResultDataRequest { } message GetQueryResultDataResponse { - required ResultCode resultCode = 1; + required RequestResult result = 1; required SerializedResultSet resultSet = 2; - optional string errorMessage = 3; - optional string errorTrace = 4; } message GetClusterInfoRequest { @@ -228,15 +223,13 @@ message DropTableRequest { } message TableResponse { - required ResultCode resultCode = 1; + required RequestResult result = 1; optional TableDescProto tableDesc = 2; - optional string errorMessage = 3; } message FunctionResponse { - required ResultCode resultCode = 1; + required RequestResult result = 1; repeated FunctionDescProto functions = 2; - optional string errorMessage = 3; } message QueryInfoProto { @@ -289,14 +282,32 @@ message QueryHistoryProto { } message GetQueryHistoryResponse { - required ResultCode resultCode = 1; + required RequestResult result = 1; optional QueryHistoryProto queryHistory = 2; - optional string errorMessage = 3; } message GetQueryInfoResponse { - required ResultCode resultCode = 1; + required RequestResult result = 1; optional QueryInfoProto queryInfo = 2; - optional string errorMessage = 3; } +message CreateIndexResponse { + required RequestResult result = 1; + optional IndexDescProto indexDesc = 2; +} + +message GetIndexesResponse { + required RequestResult result = 1; + repeated IndexDescProto indexes = 2; +} + +message GetIndexWithColumnsRequest { + required SessionIdProto sessionId = 1; + required string tableName = 2; + repeated string columnNames = 3; +} + +message GetIndexWithColumnsResponse { + required RequestResult result = 1; + optional IndexDescProto indexDesc = 2; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/proto/TajoMasterClientProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto index 10ca268..586f9ab 100644 --- a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto +++ b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto @@ -68,4 +68,13 @@ service TajoMasterClientProtocolService { rpc getTableList(GetTableListRequest) returns (GetTableListResponse); rpc getTableDesc(GetTableDescRequest) returns (TableResponse); rpc getFunctionList(SessionedStringProto) returns (FunctionResponse); + + // Index Management APIs + rpc getIndexWithName(SessionedStringProto) returns (IndexDescProto); + rpc existIndexWithName(SessionedStringProto) returns (BoolProto); + rpc getIndexesForTable(SessionedStringProto) returns (GetIndexesResponse); + rpc existIndexesForTable(SessionedStringProto) returns (BoolProto); + rpc getIndexWithColumns(GetIndexWithColumnsRequest) returns (GetIndexWithColumnsResponse); + rpc existIndexWithColumns(GetIndexWithColumnsRequest) returns (BoolProto); + rpc dropIndex(SessionedStringProto) returns (BoolProto); } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java index b7a5da7..d65f4c3 100644 --- a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java @@ -171,7 +171,7 @@ public class OverridableConf extends KeyValueSet { } public float getFloat(ConfigKey key) { - return getLong(key, null); + return getFloat(key, null); } public void put(ConfigKey key, String val) { http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-common/src/main/java/org/apache/tajo/SessionVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index d87bbef..be0639e 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -123,6 +123,10 @@ public enum SessionVars implements ConfigKey { NULL_CHAR(ConfVars.$TEXT_NULL, "null char of text file output", DEFAULT), CODEGEN(ConfVars.$CODEGEN, "Runtime code generation enabled (experiment)", DEFAULT), + // for index + INDEX_ENABLED(ConfVars.$INDEX_ENABLED, "index scan enabled", DEFAULT), + INDEX_SELECTIVITY_THRESHOLD(ConfVars.$INDEX_SELECTIVITY_THRESHOLD, "the selectivity threshold for index scan", DEFAULT), + // Behavior Control --------------------------------------------------------- ARITHABORT(ConfVars.$BEHAVIOR_ARITHMETIC_ABORT, "If true, a running query will be terminated when an overflow or divide-by-zero occurs.", DEFAULT), http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index ab11ddd..9a3ec6b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -319,6 +319,10 @@ public class TajoConf extends Configuration { $MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means infinite $CODEGEN("tajo.executor.codegen.enabled", false), // Runtime code generation + // for index + $INDEX_ENABLED("tajo.query.index.enabled", false), + $INDEX_SELECTIVITY_THRESHOLD("tajo.query.index.selectivity.threshold", 0.05f), + // Client ----------------------------------------------------------------- $CLIENT_SESSION_EXPIRY_TIME("tajo.client.session.expiry-time-sec", 3600), // default time is one hour. @@ -498,7 +502,7 @@ public class TajoConf extends Configuration { } public static long getLongVar(Configuration conf, ConfVars var) { - assert (var.valClass == Long.class || var.valClass == Integer.class); + assert (var.valClass == Long.class || var.valClass == Integer.class || var.valClass == Float.class); if (var.valClass == Integer.class) { return conf.getInt(var.varname, var.defaultIntVal); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java index a1de860..65d795d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java @@ -21,6 +21,8 @@ package org.apache.tajo.util; import com.google.common.base.Objects; import java.lang.reflect.Array; +import java.net.URI; +import java.net.URISyntaxException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -299,4 +301,12 @@ public class TUtil { StackTraceElement element = ste[2 + depth]; return element.getClassName() + ":" + element.getMethodName() + "(" + element.getLineNumber() +")"; } + + public static URI stringToURI(String str) { + try { + return new URI(str); + } catch (URISyntaxException e) { + throw new RuntimeException("Cannot convert " + str + " to the URI type", e); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 index e73680d..5754040 100644 --- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 +++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 @@ -77,10 +77,19 @@ schema_statement ; index_statement + : create_index_statement + | drop_index_statement + ; + +create_index_statement : CREATE (u=UNIQUE)? INDEX identifier ON table_name (method_specifier)? LEFT_PAREN sort_specifier_list RIGHT_PAREN param_clause? (where_clause)? ; +drop_index_statement + : DROP INDEX index_name = identifier + ; + database_definition : CREATE DATABASE (if_not_exists)? dbname = identifier ; http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java index 79513dc..4199b00 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java @@ -214,4 +214,11 @@ public class ExecutorPreCompiler extends BasicLogicalPlanVisitor<ExecutorPreComp return node; } + + @Override + public LogicalNode visitIndexScan(CompilationContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + IndexScanNode node, Stack<LogicalNode> stack) throws PlanningException { + visitScan(context, plan, block, node, stack); + return node; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java index d0db2b0..aea5a59 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java @@ -1187,7 +1187,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { } @Override - public Expr visitIndex_statement(SQLParser.Index_statementContext ctx) { + public Expr visitCreate_index_statement(SQLParser.Create_index_statementContext ctx) { String indexName = ctx.identifier().getText(); String tableName = ctx.table_name().getText(); Relation relation = new Relation(tableName); @@ -1223,6 +1223,12 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { } @Override + public Expr visitDrop_index_statement(SQLParser.Drop_index_statementContext ctx) { + String indexName = ctx.identifier().getText(); + return new DropIndex(indexName); + } + + @Override public Expr visitDatabase_definition(@NotNull SQLParser.Database_definitionContext ctx) { return new CreateDatabase(ctx.identifier().getText(), null, checkIfExist(ctx.if_not_exists())); } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 3218e15..17a9bb1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -30,10 +30,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; import org.apache.tajo.plan.serder.LogicalNodeDeserializer; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; @@ -47,20 +49,18 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAg import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.IndexUtil; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Stack; +import java.util.*; import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType; @@ -233,7 +233,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { return new LimitExec(ctx, limitNode.getInSchema(), limitNode.getOutSchema(), leftExec, limitNode); - case BST_INDEX_SCAN: + case INDEX_SCAN: IndexScanNode indexScanNode = (IndexScanNode) logicalNode; leftExec = createIndexScanExec(ctx, indexScanNode); return leftExec; @@ -1187,15 +1187,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), fragmentProtos); - String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys()); - FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(ctx.getConf()); - Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index"); - - TupleComparator comp = new BaseTupleComparator(annotation.getKeySchema(), - annotation.getSortKeys()); - return new BSTIndexScanExec(ctx, annotation, fragments.get(0), new Path(indexPath, indexName), - annotation.getKeySchema(), comp, annotation.getDatum()); - + return new BSTIndexScanExec(ctx, annotation, fragments.get(0), annotation.getIndexPath(), + annotation.getKeySchema(), annotation.getPredicates()); } public static EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) { http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 9b37a80..67f5691 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -1525,6 +1525,15 @@ public class GlobalPlanner { } @Override + public LogicalNode visitIndexScan(GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + IndexScanNode node, Stack<LogicalNode> stack) throws PlanningException { + ExecutionBlock newBlock = context.plan.newExecutionBlock(); + newBlock.setPlan(node); + context.execBlockMap.put(node.getPID(), newBlock); + return node; + } + + @Override public LogicalNode visitPartitionedTableScan(GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, PartitionedTableScanNode node, Stack<LogicalNode> stack)throws PlanningException { http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index 6adc523..3e4c62d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -18,23 +18,34 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.hadoop.fs.FileSystem; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; import org.apache.tajo.engine.planner.Projector; +import org.apache.tajo.plan.Target; import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.expr.EvalTreeUtil; +import org.apache.tajo.plan.logical.IndexScanNode; +import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.index.bst.BSTIndex; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.net.URI; +import java.util.Set; public class BSTIndexScanExec extends PhysicalExec { - private ScanNode scanNode; + private final static Log LOG = LogFactory.getLog(BSTIndexScanExec.class); + private IndexScanNode scanNode; private SeekableScanner fileScanner; private EvalNode qual; @@ -42,31 +53,62 @@ public class BSTIndexScanExec extends PhysicalExec { private Projector projector; - private Datum[] datum = null; - + private Datum[] values = null; + private boolean initialize = true; private float progress; - public BSTIndexScanExec(TaskAttemptContext context, ScanNode scanNode , - FileFragment fragment, Path fileName , Schema keySchema, - TupleComparator comparator , Datum[] datum) throws IOException { + private TableStats inputStats; + + public BSTIndexScanExec(TaskAttemptContext context, + IndexScanNode scanNode , + FileFragment fragment, URI indexPrefix , Schema keySchema, + SimplePredicate [] predicates) throws IOException { super(context, scanNode.getInSchema(), scanNode.getOutSchema()); this.scanNode = scanNode; this.qual = scanNode.getQual(); - this.datum = datum; + + SortSpec[] keySortSpecs = new SortSpec[predicates.length]; + values = new Datum[predicates.length]; + for (int i = 0; i < predicates.length; i++) { + keySortSpecs[i] = predicates[i].getKeySortSpec(); + values[i] = predicates[i].getValue(); + } + + TupleComparator comparator = new BaseTupleComparator(keySchema, + keySortSpecs); + + Schema fileScanOutSchema = mergeSubSchemas(inSchema, keySchema, scanNode.getTargets(), qual); this.fileScanner = StorageManager.getSeekableScanner(context.getConf(), - scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema); + scanNode.getTableDesc().getMeta(), inSchema, fragment, fileScanOutSchema); this.fileScanner.init(); this.projector = new Projector(context, inSchema, outSchema, scanNode.getTargets()); - FileSystem fs = fileName.getFileSystem(context.getConf()); - this.reader = new BSTIndex(fs.getConf()). - getIndexReader(fileName, keySchema, comparator); + Path indexPath = new Path(indexPrefix.toString(), context.getUniqueKeyFromFragments()); + this.reader = new BSTIndex(context.getConf()). + getIndexReader(indexPath, keySchema, comparator); this.reader.open(); } + private static Schema mergeSubSchemas(Schema originalSchema, Schema subSchema, Target[] targets, EvalNode qual) { + Schema mergedSchema = new Schema(); + Set<Column> qualAndTargets = TUtil.newHashSet(); + qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(qual)); + for (Target target : targets) { + qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree())); + } + for (Column column : originalSchema.getColumns()) { + if (subSchema.contains(column) + || qualAndTargets.contains(column) + || qualAndTargets.contains(column)) { + mergedSchema.addColumn(column); + } + } + return mergedSchema; + } + @Override public void init() throws IOException { progress = 0.0f; @@ -76,8 +118,8 @@ public class BSTIndexScanExec extends PhysicalExec { public Tuple next() throws IOException { if(initialize) { //TODO : more complicated condition - Tuple key = new VTuple(datum.length); - key.put(datum); + Tuple key = new VTuple(values.length); + key.put(values); long offset = reader.find(key); if (offset == -1) { reader.close(); @@ -116,8 +158,11 @@ public class BSTIndexScanExec extends PhysicalExec { return outTuple; } else { long offset = reader.next(); - if (offset == -1) return null; + if (offset == -1) { + return null; + } else fileScanner.seek(offset); + return null; } } } @@ -132,6 +177,16 @@ public class BSTIndexScanExec extends PhysicalExec { @Override public void close() throws IOException { IOUtils.cleanup(null, reader, fileScanner); + if (fileScanner != null) { + try { + TableStats stats = fileScanner.getInputStats(); + if (stats != null) { + inputStats = (TableStats) stats.clone(); + } + } catch (CloneNotSupportedException e) { + e.printStackTrace(); + } + } reader = null; fileScanner = null; scanNode = null; @@ -143,4 +198,13 @@ public class BSTIndexScanExec extends PhysicalExec { public float getProgress() { return progress; } + + @Override + public TableStats getInputStats() { + if (fileScanner != null) { + return fileScanner.getInputStats(); + } else { + return inputStats; + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java index 72a667d..9594b58 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java @@ -58,6 +58,7 @@ public class ProjectionExec extends UnaryPhysicalExec { } projector.eval(tuple, outTuple); + outTuple.setOffset(tuple.getOffset()); return outTuple; } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java index 0592217..f9db842 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; @@ -34,6 +35,7 @@ import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.util.Arrays; public class StoreIndexExec extends UnaryPhysicalExec { private static final Log LOG = LogFactory.getLog(StoreIndexExec.class); @@ -53,7 +55,7 @@ public class StoreIndexExec extends UnaryPhysicalExec { public void init() throws IOException { super.init(); - SortSpec[] sortSpecs = logicalPlan.getSortSpecs(); + SortSpec[] sortSpecs = logicalPlan.getKeySortSpecs(); indexKeys = new int[sortSpecs.length]; keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs); @@ -64,8 +66,7 @@ public class StoreIndexExec extends UnaryPhysicalExec { } TajoConf conf = context.getConf(); - Path indexPath = new Path(logicalPlan.getIndexPath(), ""+context.getUniqueKeyFromFragments()); - System.out.println("exec: " + indexPath); + Path indexPath = new Path(logicalPlan.getIndexPath().toString(), context.getUniqueKeyFromFragments()); // TODO: Create factory using reflection BSTIndex bst = new BSTIndex(conf); this.comparator = new BaseTupleComparator(keySchema, sortSpecs); @@ -78,17 +79,13 @@ public class StoreIndexExec extends UnaryPhysicalExec { public Tuple next() throws IOException { Tuple tuple; Tuple keyTuple; - Tuple prevKeyTuple = null; long offset; while((tuple = child.next()) != null) { offset = tuple.getOffset(); keyTuple = new VTuple(keySchema.size()); RowStoreUtil.project(tuple, keyTuple, indexKeys); - if (prevKeyTuple == null || !prevKeyTuple.equals(keyTuple)) { - indexWriter.write(keyTuple, offset); - prevKeyTuple = keyTuple; - } + indexWriter.write(keyTuple, offset); } return null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java index 29dc845..709aa81 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java @@ -22,6 +22,7 @@ import org.apache.tajo.OverridableConf; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; @SuppressWarnings("unused") public class ErrorInjectionRewriter implements LogicalPlanRewriteRule { @@ -31,12 +32,12 @@ public class ErrorInjectionRewriter implements LogicalPlanRewriteRule { } @Override - public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) { + public boolean isEligible(LogicalPlanRewriteRuleContext context) { return true; } @Override - public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException { + public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws PlanningException { throw new NullPointerException(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 51964f0..6ba413c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -34,7 +34,7 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.ClientProtos.ResultCode; import org.apache.tajo.master.TajoMaster.MasterContext; import org.apache.tajo.master.exec.DDLExecutor; import org.apache.tajo.master.exec.QueryExecutor; @@ -51,6 +51,7 @@ import org.apache.tajo.plan.verifier.VerificationState; import org.apache.tajo.plan.verifier.VerifyException; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.IPCUtil; import java.io.IOException; import java.sql.SQLException; @@ -90,7 +91,8 @@ public class GlobalEngine extends AbstractService { analyzer = new SQLAnalyzer(); preVerifier = new PreLogicalPlanVerifier(context.getCatalog()); planner = new LogicalPlanner(context.getCatalog()); - optimizer = new LogicalOptimizer(context.getConf()); + // Access path rewriter is enabled only in QueryMasterTask + optimizer = new LogicalOptimizer(context.getConf(), context.getCatalog()); annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog()); } catch (Throwable t) { LOG.error(t.getMessage(), t); @@ -169,13 +171,12 @@ public class GlobalEngine extends AbstractService { responseBuilder.setUserName(queryContext.get(SessionVars.USERNAME)); responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); responseBuilder.setIsForwarded(true); - responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR); String errorMessage = t.getMessage(); if (t.getMessage() == null) { errorMessage = t.getClass().getName(); } - responseBuilder.setErrorMessage(errorMessage); - responseBuilder.setErrorTrace(StringUtils.stringifyException(t)); + responseBuilder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, + errorMessage, StringUtils.stringifyException(t))); return responseBuilder.build(); } }
