http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java index 9575c13..51f65ee 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java @@ -22,6 +22,7 @@ package org.apache.tajo.catalog.store; import com.google.common.collect.Maps; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.tajo.TajoConstants; @@ -29,7 +30,17 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.catalog.exception.*; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; +import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto; +import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto; +import org.apache.tajo.util.TUtil; import java.io.IOException; import java.util.*; @@ -84,6 +95,22 @@ public class MemStore implements CatalogStore { public Collection<String> getAllTablespaceNames() throws CatalogException { return tablespaces.keySet(); } + + @Override + public List<TablespaceProto> getTablespaces() throws CatalogException { + List<TablespaceProto> tablespaceList = TUtil.newList(); + int tablespaceId = 0; + + for (String spaceName: tablespaces.keySet()) { + TablespaceProto.Builder builder = TablespaceProto.newBuilder(); + builder.setSpaceName(spaceName); + builder.setUri(tablespaces.get(spaceName)); + builder.setId(tablespaceId++); + tablespaceList.add(builder.build()); + } + + return tablespaceList; + } @Override public TablespaceProto getTablespace(String spaceName) throws CatalogException { @@ -139,6 +166,24 @@ public class MemStore implements CatalogStore { public Collection<String> getAllDatabaseNames() throws CatalogException { return databases.keySet(); } + + @Override + public List<DatabaseProto> getAllDatabases() throws CatalogException { + List<DatabaseProto> databaseList = new ArrayList<DatabaseProto>(); + int dbId = 0; + + for (String databaseName: databases.keySet()) { + DatabaseProto.Builder builder = DatabaseProto.newBuilder(); + + builder.setId(dbId++); + builder.setName(databaseName); + builder.setSpaceId(0); + + databaseList.add(builder.build()); + } + + return databaseList; + } /** * Get a database namespace from a Map instance. @@ -303,6 +348,118 @@ public class MemStore implements CatalogStore { Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, databaseName); return new ArrayList<String>(database.keySet()); } + + @Override + public List<TableDescriptorProto> getAllTables() throws CatalogException { + List<TableDescriptorProto> tableList = new ArrayList<CatalogProtos.TableDescriptorProto>(); + int dbId = 0, tableId = 0; + + for (String databaseName: databases.keySet()) { + Map<String, TableDescProto> tables = databases.get(databaseName); + List<String> tableNameList = TUtil.newList(tables.keySet()); + Collections.sort(tableNameList); + + for (String tableName: tableNameList) { + TableDescProto tableDesc = tables.get(tableName); + TableDescriptorProto.Builder builder = TableDescriptorProto.newBuilder(); + + builder.setDbId(dbId); + builder.setTid(tableId); + builder.setName(tableName); + builder.setPath(tableDesc.getPath()); + builder.setTableType(tableDesc.getIsExternal()?"EXTERNAL":"BASE"); + builder.setStoreType(tableDesc.getMeta().getStoreType().toString()); + + tableList.add(builder.build()); + tableId++; + } + dbId++; + } + + return tableList; + } + + @Override + public List<TableOptionProto> getAllTableOptions() throws CatalogException { + List<TableOptionProto> optionList = new ArrayList<CatalogProtos.TableOptionProto>(); + int tid = 0; + + for (String databaseName: databases.keySet()) { + Map<String, TableDescProto> tables = databases.get(databaseName); + List<String> tableNameList = TUtil.newList(tables.keySet()); + Collections.sort(tableNameList); + + for (String tableName: tableNameList) { + TableDescProto table = tables.get(tableName); + List<KeyValueProto> keyValueList = table.getMeta().getParams().getKeyvalList(); + + for (KeyValueProto keyValue: keyValueList) { + TableOptionProto.Builder builder = TableOptionProto.newBuilder(); + + builder.setTid(tid); + builder.setKeyval(keyValue); + + optionList.add(builder.build()); + } + } + tid++; + } + + return optionList; + } + + @Override + public List<TableStatsProto> getAllTableStats() throws CatalogException { + List<TableStatsProto> statList = new ArrayList<CatalogProtos.TableStatsProto>(); + int tid = 0; + + for (String databaseName: databases.keySet()) { + Map<String, TableDescProto> tables = databases.get(databaseName); + List<String> tableNameList = TUtil.newList(tables.keySet()); + Collections.sort(tableNameList); + + for (String tableName: tableNameList) { + TableDescProto table = tables.get(tableName); + TableStatsProto.Builder builder = TableStatsProto.newBuilder(); + + builder.setTid(tid); + builder.setNumRows(table.getStats().getNumRows()); + builder.setNumBytes(table.getStats().getNumBytes()); + + statList.add(builder.build()); + } + tid++; + } + + return statList; + } + + @Override + public List<ColumnProto> getAllColumns() throws CatalogException { + List<ColumnProto> columnList = new ArrayList<CatalogProtos.ColumnProto>(); + int tid = 0; + + for (String databaseName: databases.keySet()) { + Map<String, TableDescProto> tables = databases.get(databaseName); + List<String> tableNameList = TUtil.newList(tables.keySet()); + Collections.sort(tableNameList); + + for (String tableName: tableNameList) { + TableDescProto tableDesc = tables.get(tableName); + + for (ColumnProto column: tableDesc.getSchema().getFieldsList()) { + ColumnProto.Builder builder = ColumnProto.newBuilder(); + builder.setTid(tid); + builder.setName(column.getName()); + builder.setDataType(column.getDataType()); + columnList.add(builder.build()); + } + } + tid++; + } + + return columnList; + } @Override public void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws CatalogException { @@ -370,6 +527,11 @@ public class MemStore implements CatalogStore { public void dropPartitions(String tableName) throws CatalogException { throw new RuntimeException("not supported!"); } + + @Override + public List<TablePartitionProto> getAllPartitions() throws CatalogException { + throw new UnsupportedOperationException(); + } /* (non-Javadoc) * @see CatalogStore#createIndex(nta.catalog.proto.CatalogProtos.IndexDescProto) @@ -455,6 +617,33 @@ public class MemStore implements CatalogStore { return protos.toArray(new IndexDescProto[protos.size()]); } + + @Override + public List<IndexProto> getAllIndexes() throws CatalogException { + List<IndexProto> indexList = new ArrayList<CatalogProtos.IndexProto>(); + Set<String> databases = indexes.keySet(); + + for (String databaseName: databases) { + Map<String, IndexDescProto> indexMap = indexes.get(databaseName); + + for (String indexName: indexMap.keySet()) { + IndexDescProto indexDesc = indexMap.get(indexName); + IndexProto.Builder builder = IndexProto.newBuilder(); + + builder.setColumnName(indexDesc.getColumn().getName()); + builder.setDataType(indexDesc.getColumn().getDataType().getType().toString()); + builder.setIndexName(indexName); + builder.setIndexType(indexDesc.getIndexMethod().toString()); + builder.setIsAscending(indexDesc.hasIsAscending() && indexDesc.getIsAscending()); + builder.setIsClustered(indexDesc.hasIsClustered() && indexDesc.getIsClustered()); + builder.setIsUnique(indexDesc.hasIsUnique() && indexDesc.getIsUnique()); + + indexList.add(builder.build()); + } + } + + return indexList; + } @Override public void addFunction(FunctionDesc func) throws CatalogException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java index a044d64..43c6f7d 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java @@ -19,8 +19,10 @@ package org.apache.tajo.catalog; import com.google.common.collect.Sets; + import org.apache.hadoop.fs.Path; import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary; import org.apache.tajo.catalog.exception.CatalogException; import org.apache.tajo.catalog.exception.NoSuchFunctionException; import org.apache.tajo.catalog.store.PostgreSQLStore; @@ -53,7 +55,6 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto; import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceType; import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.SetLocation; import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; public class TestCatalog { static final String FieldName1="f1"; @@ -211,6 +212,7 @@ public class TestCatalog { @Test public void testCreateAndDropManyDatabases() throws Exception { List<String> createdDatabases = new ArrayList<String>(); + InfoSchemaMetadataDictionary dictionary = new InfoSchemaMetadataDictionary(); String namePrefix = "database_"; final int NUM = 10; for (int i = 0; i < NUM; i++) { @@ -223,10 +225,11 @@ public class TestCatalog { Collection<String> allDatabaseNames = catalog.getAllDatabaseNames(); for (String databaseName : allDatabaseNames) { - assertTrue(databaseName.equals(DEFAULT_DATABASE_NAME) || createdDatabases.contains(databaseName)); + assertTrue(databaseName.equals(DEFAULT_DATABASE_NAME) || createdDatabases.contains(databaseName) || + dictionary.isSystemDatabase(databaseName)); } - // additional one is 'default' database. - assertEquals(NUM + 1, allDatabaseNames.size()); + // additional ones are 'default' and 'system' databases. + assertEquals(NUM + 2, allDatabaseNames.size()); Collections.shuffle(createdDatabases); for (String tobeDropped : createdDatabases) { @@ -351,8 +354,8 @@ public class TestCatalog { } } - // Finally, only default database will remain. So, its result is 1. - assertEquals(1, catalog.getAllDatabaseNames().size()); + // Finally, default and system database will remain. So, its result is 1. + assertEquals(2, catalog.getAllDatabaseNames().size()); } @Test http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java new file mode 100644 index 0000000..d6ea459 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master; + +import com.google.protobuf.ByteString; + +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryId; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TaskId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.engine.planner.physical.SeqScanExec; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner { + private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100; + + private QueryId queryId; + private String sessionId; + private SeqScanExec scanExec; + private TableDesc tableDesc; + private RowStoreEncoder rowEncoder; + private int maxRow; + private int currentNumRows; + private TaskAttemptContext taskContext; + private TajoConf tajoConf; + private ScanNode scanNode; + + private int currentFragmentIndex = 0; + + public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, + TableDesc tableDesc, int maxRow) throws IOException { + this.tajoConf = tajoConf; + this.sessionId = sessionId; + this.queryId = queryId; + this.scanNode = scanNode; + this.tableDesc = tableDesc; + this.maxRow = maxRow; + this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema()); + } + + public void init() throws IOException { + initSeqScanExec(); + } + + private void initSeqScanExec() throws IOException { + List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType()) + .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); + + if (fragments != null && !fragments.isEmpty()) { + FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {})); + this.taskContext = new TaskAttemptContext( + new QueryContext(tajoConf), null, + new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), + fragmentProtos, null); + try { + // scanNode must be clone cause SeqScanExec change target in the case of + // a partitioned table. + scanExec = new SeqScanExec(taskContext, (ScanNode) scanNode.clone(), fragmentProtos); + } catch (CloneNotSupportedException e) { + throw new IOException(e.getMessage(), e); + } + scanExec.init(); + currentFragmentIndex += fragments.size(); + } + } + + public QueryId getQueryId() { + return queryId; + } + + public String getSessionId() { + return sessionId; + } + + public void setScanExec(SeqScanExec scanExec) { + this.scanExec = scanExec; + } + + public TableDesc getTableDesc() { + return tableDesc; + } + + public void close() throws Exception { + if (scanExec != null) { + scanExec.close(); + scanExec = null; + } + } + + public List<ByteString> getNextRows(int fetchRowNum) throws IOException { + List<ByteString> rows = new ArrayList<ByteString>(); + if (scanExec == null) { + return rows; + } + int rowCount = 0; + while (true) { + Tuple tuple = scanExec.next(); + if (tuple == null) { + scanExec.close(); + scanExec = null; + initSeqScanExec(); + if (scanExec != null) { + tuple = scanExec.next(); + } + if (tuple == null) { + if (scanExec != null) { + scanExec.close(); + scanExec = null; + } + break; + } + } + rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple)))); + rowCount++; + currentNumRows++; + if (rowCount >= fetchRowNum) { + break; + } + if (currentNumRows >= maxRow) { + scanExec.close(); + scanExec = null; + break; + } + } + return rows; + } + + @Override + public Schema getLogicalSchema() { + return tableDesc.getLogicalSchema(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java index aced80c..7e7d705 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java @@ -18,149 +18,29 @@ package org.apache.tajo.master; -import com.google.protobuf.ByteString; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryId; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.TaskId; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.engine.planner.physical.SeqScanExec; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.storage.RowStoreUtil; -import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.fragment.FragmentConvertor; -import org.apache.tajo.worker.TaskAttemptContext; - import java.io.IOException; -import java.util.ArrayList; import java.util.List; -public class NonForwardQueryResultScanner { - private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100; - - private QueryId queryId; - private String sessionId; - private SeqScanExec scanExec; - private TableDesc tableDesc; - private RowStoreEncoder rowEncoder; - private int maxRow; - private int currentNumRows; - private TaskAttemptContext taskContext; - private TajoConf tajoConf; - private ScanNode scanNode; - - private int currentFragmentIndex = 0; - - public NonForwardQueryResultScanner(TajoConf tajoConf, String sessionId, - QueryId queryId, - ScanNode scanNode, - TableDesc tableDesc, - int maxRow) throws IOException { - this.tajoConf = tajoConf; - this.sessionId = sessionId; - this.queryId = queryId; - this.scanNode = scanNode; - this.tableDesc = tableDesc; - this.maxRow = maxRow; - - this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema()); - } - - public void init() throws IOException { - initSeqScanExec(); - } - - private void initSeqScanExec() throws IOException { - List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType()) - .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); - - if (fragments != null && !fragments.isEmpty()) { - FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[]{})); - this.taskContext = new TaskAttemptContext( - new QueryContext(tajoConf), null, - new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), - fragmentProtos, null); - - try { - // scanNode must be clone cause SeqScanExec change target in the case of a partitioned table. - scanExec = new SeqScanExec(taskContext, (ScanNode)scanNode.clone(), fragmentProtos); - } catch (CloneNotSupportedException e) { - throw new IOException(e.getMessage(), e); - } - scanExec.init(); - currentFragmentIndex += fragments.size(); - } - } - - public QueryId getQueryId() { - return queryId; - } - - public String getSessionId() { - return sessionId; - } - - public void setScanExec(SeqScanExec scanExec) { - this.scanExec = scanExec; - } +import org.apache.tajo.QueryId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; - public TableDesc getTableDesc() { - return tableDesc; - } +import com.google.protobuf.ByteString; - public void close() throws Exception { - if (scanExec != null) { - scanExec.close(); - scanExec = null; - } - } +public interface NonForwardQueryResultScanner { - public List<ByteString> getNextRows(int fetchRowNum) throws IOException { - List<ByteString> rows = new ArrayList<ByteString>(); - if (scanExec == null) { - return rows; - } - int rowCount = 0; + public void close() throws Exception; - while (true) { - Tuple tuple = scanExec.next(); - if (tuple == null) { - scanExec.close(); - scanExec = null; + public Schema getLogicalSchema(); - initSeqScanExec(); - if (scanExec != null) { - tuple = scanExec.next(); - } - if (tuple == null) { - if (scanExec != null ) { - scanExec.close(); - scanExec = null; - } + public List<ByteString> getNextRows(int fetchRowNum) throws IOException; - break; - } - } - rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple)))); - rowCount++; - currentNumRows++; - if (rowCount >= fetchRowNum) { - break; - } + public QueryId getQueryId(); + + public String getSessionId(); + + public TableDesc getTableDesc(); - if (currentNumRows >= maxRow) { - scanExec.close(); - scanExec = null; - break; - } - } + public void init() throws IOException; - return rows; - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java new file mode 100644 index 0000000..c6466f5 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java @@ -0,0 +1,616 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.QueryId; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TaskId; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; +import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; +import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.codegen.CompilationError; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.Projector; +import org.apache.tajo.engine.planner.global.ExecutionBlock; +import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; +import org.apache.tajo.engine.planner.global.GlobalPlanner; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.planner.physical.PhysicalExec; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.PlanningException; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.IndexScanNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; + +import com.google.protobuf.ByteString; + +public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner { + + private final Log LOG = LogFactory.getLog(getClass()); + + private MasterContext masterContext; + private LogicalPlan logicalPlan; + private final QueryId queryId; + private final String sessionId; + private TaskAttemptContext taskContext; + private int currentRow; + private long maxRow; + private TableDesc tableDesc; + private Schema outSchema; + private RowStoreEncoder encoder; + private PhysicalExec physicalExec; + + public NonForwardQueryResultSystemScanner(MasterContext context, LogicalPlan plan, QueryId queryId, + String sessionId, int maxRow) { + masterContext = context; + logicalPlan = plan; + this.queryId = queryId; + this.sessionId = sessionId; + this.maxRow = maxRow; + + } + + @Override + public void init() throws IOException { + QueryContext queryContext = new QueryContext(masterContext.getConf()); + currentRow = 0; + + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, logicalPlan); + GlobalPlanner globalPlanner = new GlobalPlanner(masterContext.getConf(), masterContext.getCatalog()); + try { + globalPlanner.build(masterPlan); + } catch (PlanningException e) { + throw new RuntimeException(e); + } + + ExecutionBlockCursor cursor = new ExecutionBlockCursor(masterPlan); + ExecutionBlock leafBlock = null; + while (cursor.hasNext()) { + ExecutionBlock block = cursor.nextBlock(); + if (masterPlan.isLeaf(block)) { + leafBlock = block; + break; + } + } + + taskContext = new TaskAttemptContext(queryContext, null, + new TaskAttemptId(new TaskId(leafBlock.getId(), 0), 0), + null, null); + physicalExec = new SimplePhysicalPlannerImpl(masterContext.getConf()) + .createPlan(taskContext, leafBlock.getPlan()); + + tableDesc = new TableDesc("table_"+System.currentTimeMillis(), physicalExec.getSchema(), + new TableMeta(StoreType.SYSTEM, new KeyValueSet()), null); + outSchema = physicalExec.getSchema(); + encoder = RowStoreUtil.createEncoder(getLogicalSchema()); + + physicalExec.init(); + } + + @Override + public void close() throws Exception { + tableDesc = null; + outSchema = null; + encoder = null; + if (physicalExec != null) { + try { + physicalExec.close(); + } catch (Exception ignored) {} + } + physicalExec = null; + currentRow = -1; + } + + private List<Tuple> getTablespaces(Schema outSchema) { + List<TablespaceProto> tablespaces = masterContext.getCatalog().getAllTablespaces(); + List<Tuple> tuples = new ArrayList<Tuple>(tablespaces.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (TablespaceProto tablespace: tablespaces) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + if ("space_id".equalsIgnoreCase(column.getSimpleName())) { + if (tablespace.hasId()) { + aTuple.put(fieldId, DatumFactory.createInt4(tablespace.getId())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("space_name".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(tablespace.getSpaceName())); + } else if ("space_handler".equalsIgnoreCase(column.getSimpleName())) { + if (tablespace.hasHandler()) { + aTuple.put(fieldId, DatumFactory.createText(tablespace.getHandler())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("space_uri".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(tablespace.getUri())); + } + } + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getDatabases(Schema outSchema) { + List<DatabaseProto> databases = masterContext.getCatalog().getAllDatabases(); + List<Tuple> tuples = new ArrayList<Tuple>(databases.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (DatabaseProto database: databases) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + if ("db_id".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(database.getId())); + } else if ("db_name".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(database.getName())); + } else if ("space_id".equalsIgnoreCase(column.getSimpleName())) { + if (database.hasSpaceId()) { + aTuple.put(fieldId, DatumFactory.createInt4(database.getSpaceId())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } + } + + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getTables(Schema outSchema) { + List<TableDescriptorProto> tables = masterContext.getCatalog().getAllTables(); + List<Tuple> tuples = new ArrayList<Tuple>(tables.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (TableDescriptorProto table: tables) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + if ("tid".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(table.getTid())); + } else if ("db_id".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(table.getDbId())); + } else if ("table_name".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(table.getName())); + } else if ("table_type".equalsIgnoreCase(column.getSimpleName())) { + if (table.hasTableType()) { + aTuple.put(fieldId, DatumFactory.createText(table.getTableType())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("path".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(table.getPath())); + } else if ("store_type".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(table.getStoreType())); + } + } + + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getColumns(Schema outSchema) { + List<ColumnProto> columnsList = masterContext.getCatalog().getAllColumns(); + List<Tuple> tuples = new ArrayList<Tuple>(columnsList.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + int columnId = 1, prevtid = -1, tid = 0; + + for (ColumnProto column: columnsList) { + aTuple = new VTuple(outSchema.size()); + + tid = column.getTid(); + if (prevtid != tid) { + columnId = 1; + prevtid = tid; + } + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column colObj = columns.get(fieldId); + + if ("tid".equalsIgnoreCase(colObj.getSimpleName())) { + if (column.hasTid()) { + aTuple.put(fieldId, DatumFactory.createInt4(tid)); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("column_name".equalsIgnoreCase(colObj.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(column.getName())); + } else if ("ordinal_position".equalsIgnoreCase(colObj.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(columnId)); + } else if ("data_type".equalsIgnoreCase(colObj.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(column.getDataType().getType().toString())); + } else if ("type_length".equalsIgnoreCase(colObj.getSimpleName())) { + DataType dataType = column.getDataType(); + if (dataType.hasLength()) { + aTuple.put(fieldId, DatumFactory.createInt4(dataType.getLength())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } + } + + columnId++; + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getIndexes(Schema outSchema) { + List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes(); + List<Tuple> tuples = new ArrayList<Tuple>(indexList.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (IndexProto index: indexList) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + + if ("db_id".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId())); + } else if ("tid".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(index.getTId())); + } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(index.getIndexName())); + } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(index.getColumnName())); + } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(index.getDataType())); + } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(index.getIndexType())); + } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique())); + } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered())); + } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending())); + } + } + + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getAllTableOptions(Schema outSchema) { + List<TableOptionProto> optionList = masterContext.getCatalog().getAllTableOptions(); + List<Tuple> tuples = new ArrayList<Tuple>(optionList.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (TableOptionProto option: optionList) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + + if ("tid".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(option.getTid())); + } else if ("key_".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getKey())); + } else if ("value_".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getValue())); + } + } + + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getAllTableStats(Schema outSchema) { + List<TableStatsProto> statList = masterContext.getCatalog().getAllTableStats(); + List<Tuple> tuples = new ArrayList<Tuple>(statList.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (TableStatsProto stat: statList) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + + if ("tid".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(stat.getTid())); + } else if ("num_rows".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumRows())); + } else if ("num_bytes".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumBytes())); + } + } + + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getAllPartitions(Schema outSchema) { + List<TablePartitionProto> partitionList = masterContext.getCatalog().getAllPartitions(); + List<Tuple> tuples = new ArrayList<Tuple>(partitionList.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (TablePartitionProto partition: partitionList) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + + if ("pid".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(partition.getPid())); + } else if ("tid".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(partition.getTid())); + } else if ("partition_name".equalsIgnoreCase(column.getSimpleName())) { + if (partition.hasPartitionName()) { + aTuple.put(fieldId, DatumFactory.createText(partition.getPartitionName())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("ordinal_position".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(partition.getOrdinalPosition())); + } else if ("path".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(partition.getPath())); + } + } + + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> fetchSystemTable(TableDesc tableDesc, Schema inSchema) { + List<Tuple> tuples = null; + String tableName = CatalogUtil.extractSimpleName(tableDesc.getName()); + + if ("tablespace".equalsIgnoreCase(tableName)) { + tuples = getTablespaces(inSchema); + } else if ("databases".equalsIgnoreCase(tableName)) { + tuples = getDatabases(inSchema); + } else if ("tables".equalsIgnoreCase(tableName)) { + tuples = getTables(inSchema); + } else if ("columns".equalsIgnoreCase(tableName)) { + tuples = getColumns(inSchema); + } else if ("indexes".equalsIgnoreCase(tableName)) { + tuples = getIndexes(inSchema); + } else if ("table_options".equalsIgnoreCase(tableName)) { + tuples = getAllTableOptions(inSchema); + } else if ("table_stats".equalsIgnoreCase(tableName)) { + tuples = getAllTableStats(inSchema); + } else if ("partitions".equalsIgnoreCase(tableName)) { + tuples = getAllPartitions(inSchema); + } + + return tuples; + } + + @Override + public List<ByteString> getNextRows(int fetchRowNum) throws IOException { + List<ByteString> rows = new ArrayList<ByteString>(); + int startRow = currentRow; + int endRow = startRow + fetchRowNum; + + if (physicalExec == null) { + return rows; + } + + while (currentRow < endRow) { + Tuple currentTuple = physicalExec.next(); + + if (currentTuple == null) { + physicalExec.close(); + physicalExec = null; + break; + } + + currentRow++; + rows.add(ByteString.copyFrom(encoder.toBytes(currentTuple))); + + if (currentRow >= maxRow) { + physicalExec.close(); + physicalExec = null; + break; + } + } + + return rows; + } + + @Override + public QueryId getQueryId() { + return queryId; + } + + @Override + public String getSessionId() { + return sessionId; + } + + @Override + public TableDesc getTableDesc() { + return tableDesc; + } + + @Override + public Schema getLogicalSchema() { + return outSchema; + } + + class SimplePhysicalPlannerImpl extends PhysicalPlannerImpl { + + public SimplePhysicalPlannerImpl(TajoConf conf) { + super(conf); + } + + @Override + public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node) + throws IOException { + return new SystemPhysicalExec(ctx, scanNode); + } + + @Override + public PhysicalExec createIndexScanExec(TaskAttemptContext ctx, IndexScanNode annotation) throws IOException { + return new SystemPhysicalExec(ctx, annotation); + } + } + + class SystemPhysicalExec extends PhysicalExec { + + private ScanNode scanNode; + private EvalNode qual; + private Projector projector; + private TableStats tableStats; + private final List<Tuple> cachedData; + private int currentRow; + private boolean isClosed; + + public SystemPhysicalExec(TaskAttemptContext context, ScanNode scanNode) { + super(context, scanNode.getInSchema(), scanNode.getOutSchema()); + this.scanNode = scanNode; + this.qual = this.scanNode.getQual(); + cachedData = TUtil.newList(); + currentRow = 0; + isClosed = false; + + projector = new Projector(context, inSchema, outSchema, scanNode.getTargets()); + } + + @Override + public Tuple next() throws IOException { + Tuple aTuple = null; + Tuple outTuple = new VTuple(outColumnNum); + + if (isClosed) { + return null; + } + + if (cachedData.size() == 0) { + rescan(); + } + + if (!scanNode.hasQual()) { + if (currentRow < cachedData.size()) { + aTuple = cachedData.get(currentRow++); + projector.eval(aTuple, outTuple); + outTuple.setOffset(aTuple.getOffset()); + return outTuple; + } + return null; + } else { + while (currentRow < cachedData.size()) { + aTuple = cachedData.get(currentRow++); + if (qual.eval(inSchema, aTuple).isTrue()) { + projector.eval(aTuple, outTuple); + return outTuple; + } + } + return null; + } + } + + @Override + public void rescan() throws IOException { + cachedData.clear(); + cachedData.addAll(fetchSystemTable(scanNode.getTableDesc(), inSchema)); + + tableStats = new TableStats(); + tableStats.setNumRows(cachedData.size()); + } + + @Override + public void close() throws IOException { + scanNode = null; + qual = null; + projector = null; + cachedData.clear(); + currentRow = -1; + isClosed = true; + } + + @Override + public float getProgress() { + return 1.0f; + } + + @Override + protected void compile() throws CompilationError { + if (scanNode.hasQual()) { + qual = context.getPrecompiledEval(inSchema, qual); + } + } + + @Override + public TableStats getInputStats() { + return tableStats; + } + + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index ee99353..c413b65 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -525,7 +525,7 @@ public class TajoMasterClientService extends AbstractService { List<ByteString> rows = queryResultScanner.getNextRows(request.getFetchRowNum()); SerializedResultSet.Builder resultSetBuilder = SerializedResultSet.newBuilder(); - resultSetBuilder.setSchema(queryResultScanner.getTableDesc().getLogicalSchema().getProto()); + resultSetBuilder.setSchema(queryResultScanner.getLogicalSchema().getProto()); resultSetBuilder.addAllSerializedTuples(rows); builder.setResultSet(resultSetBuilder.build()); http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 10701f9..2242445 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -41,7 +41,9 @@ import org.apache.tajo.engine.planner.physical.StoreTableExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; +import org.apache.tajo.master.NonForwardQueryResultFileScanner; import org.apache.tajo.master.NonForwardQueryResultScanner; +import org.apache.tajo.master.NonForwardQueryResultSystemScanner; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.exec.prehook.CreateTableHook; import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager; @@ -104,6 +106,8 @@ public class QueryExecutor { } else if (plan.isExplain()) { // explain query execExplain(plan, response); + } else if (PlannerUtil.checkIfQueryTargetIsVirtualTable(plan)) { + execQueryOnVirtualTable(queryContext, session, sql, plan, response); // Simple query indicates a form of 'select * from tb_name [LIMIT X];'. } else if (PlannerUtil.checkIfSimpleQuery(plan)) { @@ -183,6 +187,27 @@ public class QueryExecutor { response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); } + public void execQueryOnVirtualTable(QueryContext queryContext, Session session, String query, LogicalPlan plan, + SubmitQueryResponse.Builder response) throws Exception { + int maxRow = Integer.MAX_VALUE; + if (plan.getRootBlock().hasNode(NodeType.LIMIT)) { + LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT); + maxRow = (int) limitNode.getFetchFirstNum(); + } + QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId()); + + NonForwardQueryResultScanner queryResultScanner = + new NonForwardQueryResultSystemScanner(context, plan, queryId, session.getSessionId(), maxRow); + + queryResultScanner.init(); + session.addNonForwardQueryResultScanner(queryResultScanner); + + response.setQueryId(queryId.getProto()); + response.setMaxRowNum(maxRow); + response.setTableDesc(queryResultScanner.getTableDesc().getProto()); + response.setResultCode(ClientProtos.ResultCode.OK); + } + public void execSimpleQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan, SubmitQueryResponse.Builder response) throws Exception { ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN); @@ -202,7 +227,7 @@ public class QueryExecutor { QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId()); NonForwardQueryResultScanner queryResultScanner = - new NonForwardQueryResultScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow); + new NonForwardQueryResultFileScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow); queryResultScanner.init(); session.addNonForwardQueryResultScanner(queryResultScanner); http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java new file mode 100644 index 0000000..bdd6dfc --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master; + +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; + +import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; + +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.benchmark.TPCH; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.LimitNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.util.KeyValueSet; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.protobuf.ByteString; + +public class TestNonForwardQueryResultSystemScanner { + + private class CollectionMatcher<T> extends TypeSafeDiagnosingMatcher<Iterable<? extends T>> { + + private final Matcher<? extends T> matcher; + + public CollectionMatcher(Matcher<? extends T> matcher) { + this.matcher = matcher; + } + + @Override + public void describeTo(Description description) { + description.appendText("a collection containing ").appendDescriptionOf(this.matcher); + } + + @Override + protected boolean matchesSafely(Iterable<? extends T> item, Description mismatchDescription) { + boolean isFirst = true; + Iterator<? extends T> iterator = item.iterator(); + + while (iterator.hasNext()) { + T obj = iterator.next(); + if (this.matcher.matches(obj)) { + return true; + } + + if (!isFirst) { + mismatchDescription.appendText(", "); + } + + this.matcher.describeMismatch(obj, mismatchDescription); + isFirst = false; + } + return false; + } + + } + + private <T> Matcher<Iterable<? extends T>> hasItem(Matcher<? extends T> matcher) { + return new CollectionMatcher<T>(matcher); + } + + private static LocalTajoTestingUtility testUtil; + private static TajoTestingCluster testingCluster; + private static TajoConf conf; + private static MasterContext masterContext; + + private static SQLAnalyzer analyzer; + private static LogicalPlanner logicalPlanner; + private static LogicalOptimizer logicalOptimizer; + + private static void setupTestingCluster() throws Exception { + testUtil = new LocalTajoTestingUtility(); + String[] names, paths; + Schema[] schemas; + + TPCH tpch = new TPCH(); + tpch.loadSchemas(); + tpch.loadQueries(); + + names = new String[] {"customer", "lineitem", "nation", "orders", "part", "partsupp", + "region", "supplier", "empty_orders"}; + schemas = new Schema[names.length]; + for (int i = 0; i < names.length; i++) { + schemas[i] = tpch.getSchema(names[i]); + } + + File file; + paths = new String[names.length]; + for (int i = 0; i < names.length; i++) { + file = new File("src/test/tpch/" + names[i] + ".tbl"); + if(!file.exists()) { + file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + names[i] + + ".tbl"); + } + paths[i] = file.getAbsolutePath(); + } + + KeyValueSet opt = new KeyValueSet(); + opt.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + testUtil.setup(names, paths, schemas, opt); + + testingCluster = testUtil.getTestingCluster(); + } + + @BeforeClass + public static void setUp() throws Exception { + setupTestingCluster(); + + conf = testingCluster.getConfiguration(); + masterContext = testingCluster.getMaster().getContext(); + + GlobalEngine globalEngine = masterContext.getGlobalEngine(); + analyzer = globalEngine.getAnalyzer(); + logicalPlanner = globalEngine.getLogicalPlanner(); + logicalOptimizer = globalEngine.getLogicalOptimizer(); + } + + @AfterClass + public static void tearDown() throws Exception { + try { + Thread.sleep(2000); + } catch (Exception ignored) { + } + + testUtil.shutdown(); + } + + private NonForwardQueryResultScanner getScanner(String sql) throws Exception { + QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId()); + String sessionId = UUID.randomUUID().toString(); + + return getScanner(sql, queryId, sessionId); + } + + private NonForwardQueryResultScanner getScanner(String sql, QueryId queryId, String sessionId) throws Exception { + QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf); + + Expr expr = analyzer.parse(sql); + LogicalPlan logicalPlan = logicalPlanner.createPlan(queryContext, expr); + logicalOptimizer.optimize(logicalPlan); + + int maxRow = Integer.MAX_VALUE; + if (logicalPlan.getRootBlock().hasNode(NodeType.LIMIT)) { + LimitNode limitNode = logicalPlan.getRootBlock().getNode(NodeType.LIMIT); + maxRow = (int) limitNode.getFetchFirstNum(); + } + + NonForwardQueryResultScanner queryResultScanner = + new NonForwardQueryResultSystemScanner(masterContext, logicalPlan, queryId, + sessionId, maxRow); + + return queryResultScanner; + } + + @Test + public void testInit() throws Exception { + QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId()); + String sessionId = UUID.randomUUID().toString(); + NonForwardQueryResultScanner queryResultScanner = + getScanner("SELECT SPACE_ID, SPACE_URI FROM INFORMATION_SCHEMA.TABLESPACE", + queryId, sessionId); + + queryResultScanner.init(); + + assertThat(queryResultScanner.getQueryId(), is(notNullValue())); + assertThat(queryResultScanner.getLogicalSchema(), is(notNullValue())); + assertThat(queryResultScanner.getSessionId(), is(notNullValue())); + assertThat(queryResultScanner.getTableDesc(), is(notNullValue())); + + assertThat(queryResultScanner.getQueryId(), is(queryId)); + assertThat(queryResultScanner.getSessionId(), is(sessionId)); + + assertThat(queryResultScanner.getLogicalSchema().size(), is(2)); + assertThat(queryResultScanner.getLogicalSchema().getColumn("space_id"), is(notNullValue())); + } + + private List<Tuple> getTupleList(RowStoreDecoder decoder, List<ByteString> bytes) { + List<Tuple> tuples = new ArrayList<Tuple>(bytes.size()); + + for (ByteString byteString: bytes) { + Tuple aTuple = decoder.toTuple(byteString.toByteArray()); + tuples.add(aTuple); + } + + return tuples; + } + + private <T> Matcher<Tuple> getTupleMatcher(final int fieldId, final Matcher<T> matcher) { + return new TypeSafeDiagnosingMatcher<Tuple>() { + + @Override + public void describeTo(Description description) { + description.appendDescriptionOf(matcher); + } + + @Override + protected boolean matchesSafely(Tuple item, Description mismatchDescription) { + Datum datum = item.get(fieldId); + Object itemValue = null; + + if (datum.type() == Type.TEXT) { + itemValue = datum.asChars(); + } else if (datum.type() == Type.INT4) { + itemValue = datum.asInt4(); + } else if (datum.type() == Type.INT8) { + itemValue = datum.asInt8(); + } + + if (itemValue != null && matcher.matches(itemValue)) { + return true; + } + + matcher.describeMismatch(itemValue, mismatchDescription); + return false; + } + }; + } + + @Test + public void testGetNextRowsForAggregateFunction() throws Exception { + NonForwardQueryResultScanner queryResultScanner = + getScanner("SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES"); + + queryResultScanner.init(); + + List<ByteString> rowBytes = queryResultScanner.getNextRows(100); + + assertThat(rowBytes.size(), is(1)); + + RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema()); + List<Tuple> tuples = getTupleList(decoder, rowBytes); + + assertThat(tuples.size(), is(1)); + assertThat(tuples, hasItem(getTupleMatcher(0, is(9L)))); + } + + @Test + public void testGetNextRowsForTable() throws Exception { + NonForwardQueryResultScanner queryResultScanner = + getScanner("SELECT TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES"); + + queryResultScanner.init(); + + List<ByteString> rowBytes = queryResultScanner.getNextRows(100); + + assertThat(rowBytes.size(), is(9)); + + RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema()); + List<Tuple> tuples = getTupleList(decoder, rowBytes);; + + assertThat(tuples.size(), is(9)); + assertThat(tuples, hasItem(getTupleMatcher(0, is("lineitem")))); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index eebee6f..9002f28 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -37,6 +37,7 @@ import org.apache.tajo.algebra.WindowSpec; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.plan.LogicalPlan.QueryBlock; @@ -1314,7 +1315,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex } private void updatePhysicalInfo(TableDesc desc) { - if (desc.getPath() != null) { + if (desc.getPath() != null && desc.getMeta().getStoreType() != StoreType.SYSTEM) { try { Path path = new Path(desc.getPath()); FileSystem fs = path.getFileSystem(new Configuration()); http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index d813432..0fbd359 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -139,6 +139,27 @@ public class PlannerUtil { (simpleOperator && noComplexComputation && isOneQueryBlock && noOrderBy && noGroupBy && noWhere && noJoin && singleRelation); } + + /** + * Checks whether the target of this query is a virtual table or not. + * It will be removed after tajo storage supports catalog service access. + * + */ + public static boolean checkIfQueryTargetIsVirtualTable(LogicalPlan plan) { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + + boolean hasScanNode = plan.getRootBlock().hasNode(NodeType.SCAN); + LogicalNode[] scanNodes = findAllNodes(rootNode, NodeType.SCAN); + boolean isVirtualTable = scanNodes.length > 0; + ScanNode scanNode = null; + + for (LogicalNode node: scanNodes) { + scanNode = (ScanNode) node; + isVirtualTable &= (scanNode.getTableDesc().getMeta().getStoreType() == StoreType.SYSTEM); + } + + return !checkIfDDLPlan(rootNode) && hasScanNode && isVirtualTable; + } /** * Checks whether the query has 'from clause' or not.
