http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java index 5e3d0b6..d490001 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java @@ -19,13 +19,13 @@ package org.apache.tajo.master.exec.prehook; import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.CreateTableNode; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.plan.logical.NodeType; -import org.apache.tajo.storage.StorageUtil; +import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.Tablespace; public class CreateTableHook implements DistributedQueryHook { @@ -43,8 +43,10 @@ public class CreateTableHook implements DistributedQueryHook { String databaseName = splitted[0]; String tableName = splitted[1]; queryContext.setOutputTable(tableName); - queryContext.setOutputPath( - StorageUtil.concatPath(TajoConf.getWarehouseDir(queryContext.getConf()), databaseName, tableName)); + + // set the final output table uri + queryContext.setOutputPath(createTableNode.getUri()); + if(createTableNode.getPartitionMethod() != null) { queryContext.setPartitionMethod(createTableNode.getPartitionMethod()); }
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java index 3dba176..f403092 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java @@ -37,7 +37,7 @@ public class DistributedQueryHookManager { try { hook.hook(queryContext, plan); } catch (Throwable t) { - t.printStackTrace(); + throw new RuntimeException(t); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java index 14c4d8d..c309f57 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java @@ -39,23 +39,21 @@ public class InsertIntoHook implements DistributedQueryHook { // Set QueryContext settings, such as output table name and output path. // It also remove data files if overwrite is true. - Path outputPath; if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME] queryContext.setOutputTable(insertNode.getTableName()); - queryContext.setOutputPath(insertNode.getPath()); if (insertNode.hasPartition()) { queryContext.setPartitionMethod(insertNode.getPartitionMethod()); } } else { // INSERT INTO LOCATION ... // When INSERT INTO LOCATION, must not set output table. - outputPath = insertNode.getPath(); queryContext.setFileOutput(); - queryContext.setOutputPath(outputPath); } + // Set the final output table uri + queryContext.setOutputPath(insertNode.getUri()); + if (insertNode.isOverwrite()) { queryContext.setOutputOverwrite(); } - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 23808b5..4fef02c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.state.*; import org.apache.hadoop.yarn.util.Clock; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; +import org.apache.tajo.QueryVars; import org.apache.tajo.SessionVars; import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; @@ -48,6 +49,7 @@ import org.apache.tajo.master.event.*; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.Tablespace; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.QueryHistory; import org.apache.tajo.util.history.StageHistory; @@ -427,40 +429,59 @@ public class Query implements EventHandler<QueryEvent> { } else { finalState = QueryState.QUERY_ERROR; } + + // When a query is failed if (finalState != QueryState.QUERY_SUCCEEDED) { Stage lastStage = query.getStage(stageEvent.getExecutionBlockId()); - if (lastStage != null && lastStage.getTableMeta() != null) { - String storeType = lastStage.getTableMeta().getStoreType(); - if (storeType != null) { - LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); - try { - TableSpaceManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild()); - } catch (IOException e) { - LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e); - } - } - } + handleQueryFailure(query, lastStage); } + query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId())); query.setFinishTime(); return finalState; } + // handle query failures + private void handleQueryFailure(Query query, Stage lastStage) { + QueryContext context = query.context.getQueryContext(); + + if (lastStage != null && context.hasOutputTableUri()) { + Tablespace space = TableSpaceManager.get(context.getOutputTableUri()).get(); + try { + LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); + space.rollbackTable(rootNode.getChild()); + } catch (IOException e) { + LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e); + } + } + } + private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { Stage lastStage = query.getStage(event.getExecutionBlockId()); - String storeType = lastStage.getTableMeta().getStoreType(); + try { + LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); - Path finalOutputDir = TableSpaceManager.getStorageManager(query.systemConf, storeType) - .commitOutputData(query.context.getQueryContext(), - lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc); + QueryContext queryContext = query.context.getQueryContext(); + + // If there is not tabledesc, it is a select query without insert or ctas. + // In this case, we should use default tablespace. + Tablespace space = TableSpaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get(); + + Path finalOutputDir = space.commitTable( + query.context.getQueryContext(), + lastStage.getId(), + lastStage.getMasterPlan().getLogicalPlan(), + lastStage.getSchema(), + tableDesc); QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir); + } catch (Exception e) { query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); return QueryState.QUERY_ERROR; http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 2809a70..84f2eac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -18,6 +18,7 @@ package org.apache.tajo.querymaster; +import com.google.common.base.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -51,14 +52,9 @@ import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.verifier.VerifyException; import org.apache.tajo.session.Session; -import org.apache.tajo.storage.Tablespace; -import org.apache.tajo.storage.StorageProperty; -import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.*; import org.apache.tajo.util.metrics.TajoMetrics; import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter; import org.apache.tajo.worker.AbstractResourceAllocator; @@ -308,43 +304,28 @@ public class QueryMasterTask extends CompositeService { } public synchronized void startQuery() { - Tablespace sm = null; LogicalPlan plan = null; + Tablespace space = null; try { if (query != null) { LOG.warn("Query already started"); return; } + + CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog(); - LogicalPlanner planner = new LogicalPlanner(catalog); + LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); LogicalOptimizer optimizer = new LogicalOptimizer(systemConf); Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class); jsonExpr = null; // remove the possible OOM - plan = planner.createPlan(queryContext, expr); - - String storeType = PlannerUtil.getStoreType(plan); - if (storeType != null) { - sm = TableSpaceManager.getStorageManager(systemConf, storeType); - StorageProperty storageProperty = sm.getStorageProperty(); - if (storageProperty.isSortedInsert()) { - String tableName = PlannerUtil.getStoreTableName(plan); - LogicalRootNode rootNode = plan.getRootBlock().getRoot(); - TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); - if (tableDesc == null) { - throw new VerifyException("Can't get table meta data from catalog: " + tableName); - } - List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules( - getQueryTaskContext().getQueryContext(), tableDesc); - if (storageSpecifiedRewriteRules != null) { - for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) { - optimizer.addRuleAfterToJoinOpt(eachRule); - } - } - } - } + plan = planner.createPlan(queryContext, expr); optimizer.optimize(queryContext, plan); + // when a given uri is null, TableSpaceManager.get will return the default tablespace. + space = TableSpaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get(); + space.rewritePlan(queryContext, plan); + for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) { LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN); if (scanNodes != null) { @@ -374,10 +355,10 @@ public class QueryMasterTask extends CompositeService { LOG.error(t.getMessage(), t); initError = t; - if (plan != null && sm != null) { + if (plan != null && space != null) { LogicalRootNode rootNode = plan.getRootBlock().getRoot(); try { - sm.rollbackOutputCommit(rootNode.getChild()); + space.rollbackTable(rootNode.getChild()); } catch (IOException e) { LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e); } @@ -422,16 +403,27 @@ public class QueryMasterTask extends CompositeService { // Create Output Directory //////////////////////////////////////////// - String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, ""); - if (context.isCreateTable() || context.isInsert()) { - if (outputPath == null || outputPath.isEmpty()) { - // hbase - stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); + String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, ""); + + // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO + // So, this query results won't be materialized as a part of a table. + // The result will be temporarily written in the staging directory. + if (outputPath.isEmpty()) { + // for temporarily written in the storage directory + stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); + } else { + Optional<Tablespace> spaceResult = TableSpaceManager.get(outputPath); + if (!spaceResult.isPresent()) { + throw new IOException("No registered Tablespace for " + outputPath); + } + + Tablespace space = spaceResult.get(); + if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation + // If this space allows move operation, the staging directory will be underneath the final output table uri. + stagingDir = StorageUtil.concatPath(context.getOutputTableUri().toString(), TMP_STAGING_DIR_PREFIX, queryId); } else { - stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId); + stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); } - } else { - stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); } // initializ http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index ec09145..c77e6f4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -36,7 +36,6 @@ import org.apache.tajo.engine.planner.UniformRangePartition; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; -import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.global.rewriter.rules.GlobalPlanRewriteUtil; import org.apache.tajo.engine.utils.TupleUtil; @@ -83,19 +82,14 @@ public class Repartitioner { QueryMasterTask.QueryMasterTaskContext masterContext = stage.getContext(); ScanNode[] scans = execBlock.getScanNodes(); - - Path tablePath; Fragment[] fragments = new Fragment[scans.length]; long[] stats = new long[scans.length]; // initialize variables from the child operators for (int i = 0; i < scans.length; i++) { TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName()); - if (tableDesc == null) { // if it is a real table stored on storage - FileTablespace storageManager = - (FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf()); - tablePath = storageManager.getTablePath(scans[i].getTableName()); + if (tableDesc == null) { // if it is a real table stored on storage if (execBlock.getUnionScanMap() != null && !execBlock.getUnionScanMap().isEmpty()) { for (Map.Entry<ExecutionBlockId, ExecutionBlockId> unionScanEntry: execBlock.getUnionScanMap().entrySet()) { ExecutionBlockId originScanEbId = unionScanEntry.getKey(); @@ -105,25 +99,29 @@ public class Repartitioner { ExecutionBlockId scanEBId = TajoIdUtils.createExecutionBlockId(scans[i].getTableName()); stats[i] = masterContext.getStage(scanEBId).getResultStats().getNumBytes(); } - fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); + + // TODO - We should remove dummy flagment usages + fragments[i] = new FileFragment(scans[i].getCanonicalName(), new Path("/dummy"), 0, 0, + new String[]{UNKNOWN_HOST}); + } else { + try { stats[i] = GlobalPlanRewriteUtil.computeDescendentVolume(scans[i]); } catch (PlanningException e) { throw new IOException(e); } - Tablespace tablespace = - TableSpaceManager.getStorageManager(stage.getContext().getConf(), tableDesc.getMeta().getStoreType()); - // if table has no data, tablespace will return empty FileFragment. // So, we need to handle FileFragment by its size. // If we don't check its size, it can cause IndexOutOfBoundsException. - List<Fragment> fileFragments = tablespace.getSplits(scans[i].getCanonicalName(), tableDesc); + Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get(); + List<Fragment> fileFragments = space.getSplits(scans[i].getCanonicalName(), tableDesc); if (fileFragments.size() > 0) { fragments[i] = fileFragments.get(0); } else { - fragments[i] = new FileFragment(scans[i].getCanonicalName(), new Path(tableDesc.getPath()), 0, 0, new String[]{UNKNOWN_HOST}); + fragments[i] = new FileFragment(scans[i].getCanonicalName(), + new Path(tableDesc.getUri()), 0, 0, new String[]{UNKNOWN_HOST}); } } } @@ -377,26 +375,29 @@ public class Repartitioner { if (broadcastFragments != null) { //In this phase a ScanNode has a single fragment. //If there are more than one data files, that files should be added to fragments or partition path + for (ScanNode eachScan: broadcastScans) { + Path[] partitionScanPaths = null; TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName()); + Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get(); + if (eachScan.getType() == NodeType.PARTITIONS_SCAN) { - FileTablespace storageManager = - (FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf()); PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan; partitionScanPaths = partitionScan.getInputPaths(); // set null to inputPaths in getFragmentsFromPartitionedTable() - getFragmentsFromPartitionedTable(storageManager, eachScan, tableDesc); + getFragmentsFromPartitionedTable((FileTablespace) space, eachScan, tableDesc); partitionScan.setInputPaths(partitionScanPaths); + } else { - Tablespace tablespace = TableSpaceManager.getStorageManager(stage.getContext().getConf(), - tableDesc.getMeta().getStoreType()); - Collection<Fragment> scanFragments = tablespace.getSplits(eachScan.getCanonicalName(), + + Collection<Fragment> scanFragments = space.getSplits(eachScan.getCanonicalName(), tableDesc, eachScan); if (scanFragments != null) { rightFragments.addAll(scanFragments); } + } } } @@ -505,18 +506,16 @@ public class Repartitioner { Collection<Fragment> scanFragments; Path[] partitionScanPaths = null; + + FileTablespace space = (FileTablespace) TableSpaceManager.get(desc.getUri()).get(); + if (scan.getType() == NodeType.PARTITIONS_SCAN) { PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan; partitionScanPaths = partitionScan.getInputPaths(); // set null to inputPaths in getFragmentsFromPartitionedTable() - FileTablespace storageManager = - (FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf()); - scanFragments = getFragmentsFromPartitionedTable(storageManager, scan, desc); + scanFragments = getFragmentsFromPartitionedTable(space, scan, desc); } else { - Tablespace tablespace = - TableSpaceManager.getStorageManager(stage.getContext().getConf(), desc.getMeta().getStoreType()); - - scanFragments = tablespace.getSplits(scan.getCanonicalName(), desc, scan); + scanFragments = space.getSplits(scan.getCanonicalName(), desc, scan); } if (scanFragments != null) { @@ -618,9 +617,6 @@ public class Repartitioner { throws IOException { ExecutionBlock execBlock = stage.getBlock(); ScanNode scan = execBlock.getScanNodes()[0]; - Path tablePath; - tablePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf())) - .getTablePath(scan.getTableName()); ExecutionBlock sampleChildBlock = masterPlan.getChild(stage.getId(), 0); SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(), NodeType.SORT); @@ -648,10 +644,15 @@ public class Repartitioner { throw new IOException("Can't get table meta data from catalog: " + PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan())); } - ranges = TableSpaceManager.getStorageManager(stage.getContext().getConf(), storeType) - .getInsertSortRanges(stage.getContext().getQueryContext(), tableDesc, - sortNode.getInSchema(), sortSpecs, - mergedRange); + + Tablespace space = TableSpaceManager.getAnyByScheme(storeType).get(); + ranges = space.getInsertSortRanges( + stage.getContext().getQueryContext(), + tableDesc, + sortNode.getInSchema(), + sortSpecs, + mergedRange); + determinedTaskNum = ranges.length; } else { RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs); @@ -688,7 +689,9 @@ public class Repartitioner { } } - FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); + // TODO - We should remove dummy fragment. + FileFragment dummyFragment = new FileFragment(scan.getTableName(), new Path("/dummy"), 0, 0, + new String[]{UNKNOWN_HOST}); Stage.scheduleFragment(stage, dummyFragment); List<FetchImpl> fetches = new ArrayList<FetchImpl>(); @@ -784,11 +787,9 @@ public class Repartitioner { int maxNum) throws IOException { ExecutionBlock execBlock = stage.getBlock(); ScanNode scan = execBlock.getScanNodes()[0]; - Path tablePath; - tablePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf())) - .getTablePath(scan.getTableName()); - Fragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); + // TODO - We should remove dummy fragment usages + Fragment frag = new FileFragment(scan.getCanonicalName(), new Path("/dummy"), 0, 0, new String[]{UNKNOWN_HOST}); List<Fragment> fragments = new ArrayList<Fragment>(); fragments.add(frag); Stage.scheduleFragments(stage, fragments); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 5a0fc38..a7d605c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -60,8 +60,8 @@ import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.querymaster.Task.IntermediateEntry; import org.apache.tajo.storage.FileTablespace; -import org.apache.tajo.storage.Tablespace; import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.Tablespace; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; @@ -1084,18 +1084,18 @@ public class Stage implements EventHandler<StageEvent> { Collection<Fragment> fragments; TableMeta meta = table.getMeta(); + Tablespace tablespace = TableSpaceManager.get(scan.getTableDesc().getUri()).get(); + // Depending on scanner node's type, it creates fragments. If scan is for // a partitioned table, It will creates lots fragments for all partitions. // Otherwise, it creates at least one fragments for a table, which may // span a number of blocks or possibly consists of a number of files. + // + // Also, we can ensure FileTableSpace if the type of ScanNode is PARTITIONS_SCAN. if (scan.getType() == NodeType.PARTITIONS_SCAN) { // After calling this method, partition paths are removed from the physical plan. - FileTablespace storageManager = - (FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf()); - fragments = Repartitioner.getFragmentsFromPartitionedTable(storageManager, scan, table); + fragments = Repartitioner.getFragmentsFromPartitionedTable((FileTablespace) tablespace, scan, table); } else { - Tablespace tablespace = - TableSpaceManager.getStorageManager(stage.getContext().getConf(), meta.getStoreType()); fragments = tablespace.getSplits(scan.getCanonicalName(), table, scan); } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java index f265e50..ae22d0d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java +++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java @@ -486,7 +486,7 @@ public class QueryExecutorServlet extends HttpServlet { if (resultRows <= 0) { resultRows = 1000; } - LOG.info("Tajo Query Result: " + desc.getPath() + "\n"); + LOG.info("Tajo Query Result: " + desc.getUri() + "\n"); int numOfColumns = rsmd.getColumnCount(); for(int i = 0; i < numOfColumns; i++) { http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java index 0721ef1..0df5d4d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java @@ -160,7 +160,7 @@ public class LegacyTaskImpl implements Task { this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); } } else { - Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf)) + Path outFilePath = ((FileTablespace) TableSpaceManager.get(queryContext.getStagingDir().toUri()).get()) .getAppenderFilePath(getId(), queryContext.getStagingDir()); LOG.info("Output File Path: " + outFilePath); context.setOutputPath(outFilePath); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index fbd070e..66c8e4a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -40,7 +40,6 @@ import org.apache.tajo.function.FunctionSignature; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.service.ServiceTracker; -import org.apache.tajo.service.ServiceTrackerException; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.service.TajoMasterInfo; import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; @@ -56,7 +55,7 @@ import org.apache.tajo.rule.EvaluationFailedException; import org.apache.tajo.rule.SelfDiagnosisRuleEngine; import org.apache.tajo.rule.SelfDiagnosisRuleSession; import org.apache.tajo.storage.HashShuffleAppenderManager; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.OldStorageManager; import org.apache.tajo.util.*; import org.apache.tajo.util.history.HistoryReader; import org.apache.tajo.util.history.HistoryWriter; @@ -370,7 +369,7 @@ public class TajoWorker extends CompositeService { } try { - TableSpaceManager.shutdown(); + OldStorageManager.shutdown(); } catch (IOException ie) { LOG.error(ie.getMessage(), ie); } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index be3960b..5974693 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -154,7 +154,7 @@ public class TaskImpl implements Task { this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); } } else { - Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf)) + Path outFilePath = ((FileTablespace) TableSpaceManager.get(queryContext.getStagingDir().toUri()).get()) .getAppenderFilePath(getId(), queryContext.getStagingDir()); LOG.info("Output File Path: " + outFilePath); context.setOutputPath(outFilePath); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/resources/webapps/admin/catalogview.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp index 43ec5ca..6771912 100644 --- a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp +++ b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp @@ -30,7 +30,6 @@ <%@ page import="java.util.Collection" %> <%@ page import="java.util.List" %> <%@ page import="java.util.Map" %> -<%@ page import="org.apache.tajo.service.ServiceTracker" %> <%@ page import="java.net.InetSocketAddress" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); @@ -186,7 +185,7 @@ <div style='margin-top:10px'> <div style=''>Detail</div> <table border="1" class='border_table'> - <tr><td width='100'>Table path</td><td width='410'><%=tableDesc.getPath()%></td></tr> + <tr><td width='100'>Table path</td><td width='410'><%=tableDesc.getUri()%></td></tr> <tr><td>Store type</td><td><%=tableDesc.getMeta().getStoreType()%></td></tr> <tr><td># rows</td><td><%=(tableDesc.hasStats() ? ("" + tableDesc.getStats().getNumRows()) : "-")%></td></tr> <tr><td>Volume</td><td><%=(tableDesc.hasStats() ? FileUtil.humanReadableByteCount(tableDesc.getStats().getNumBytes(),true) : "-")%></td></tr> http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/main/resources/webapps/admin/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp index e0cf876..43bb6c1 100644 --- a/tajo-core/src/main/resources/webapps/admin/index.jsp +++ b/tajo-core/src/main/resources/webapps/admin/index.jsp @@ -23,20 +23,19 @@ <%@ page import="org.apache.tajo.conf.TajoConf" %> <%@ page import="org.apache.tajo.ipc.QueryCoordinatorProtocol" %> <%@ page import="org.apache.tajo.master.TajoMaster" %> -<%@ page import="org.apache.tajo.service.ServiceTracker" %> -<%@ page import="org.apache.tajo.service.TajoMasterInfo" %> -<%@ page import="org.apache.tajo.master.QueryInProgress" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="org.apache.tajo.master.rm.WorkerState" %> +<%@ page import="org.apache.tajo.service.ServiceTracker" %> +<%@ page import="org.apache.tajo.service.TajoMasterInfo" %> +<%@ page import="org.apache.tajo.storage.TableSpaceManager" %> +<%@ page import="org.apache.tajo.storage.Tablespace" %> <%@ page import="org.apache.tajo.util.NetUtils" %> <%@ page import="org.apache.tajo.util.TUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> -<%@ page import="java.util.List" %> -<%@ page import="java.util.Collection" %> +<%@ page import="java.net.InetSocketAddress" %> <%@ page import="java.util.Date" %> +<%@ page import="java.util.List" %> <%@ page import="java.util.Map" %> -<%@ page import="java.net.InetSocketAddress" %> -<%@ page import="org.apache.tajo.service.ServiceTracker" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); @@ -139,7 +138,15 @@ <tr><td width='150'>Threads:</td><td><a href='thread.jsp'>thread dump...</a></tr> </table> <hr/> - + <h3>Tablespaces</h3> + <table width="100%" class="border_table" border="1"> + <tr><th>Tablespace Name</th><th>URI</th><th>Handler</th></tr> + <% for (Tablespace space : TableSpaceManager.getAllTablespaces()) { + if (space.isVisible()) { %> + <tr><td><%=space.getName()%></td><td><%=space.getUri()%></td><td><%=space.getClass().getName()%></td></tr> + <% }}%> + </table> + <hr/> <h3>Cluster Summary</h3> <table width="100%" class="border_table" border="1"> <tr><th>Type</th><th>Total</th><th>Live</th><th>Dead</th><th>Running Master</th><th>Memory Resource<br/>(used/total)</th><th>Disk Resource<br/>(used/total)</th></tr> http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java index b5be9d0..ca2378b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java @@ -46,7 +46,7 @@ public class BackendTestingUtil { public static void writeTmpTable(TajoConf conf, Path tablePath) throws IOException { - FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); + FileTablespace sm = TableSpaceManager.getDefault(); Appender appender; Path filePath = new Path(tablePath, "table.csv"); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 9a92e90..57b1e18 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -945,7 +945,7 @@ public class QueryTestCaseBase { return null; } - Path path = new Path(tableDesc.getPath()); + Path path = new Path(tableDesc.getUri()); return getTableFileContents(path); } @@ -955,7 +955,7 @@ public class QueryTestCaseBase { return null; } - Path path = new Path(tableDesc.getPath()); + Path path = new Path(tableDesc.getUri()); FileSystem fs = path.getFileSystem(conf); return listFiles(fs, path); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 9b5980b..acdae85 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -33,7 +33,6 @@ import org.apache.hadoop.util.ShutdownHookManager; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.client.TajoClientUtil; @@ -48,16 +47,18 @@ import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.querymaster.Stage; import org.apache.tajo.querymaster.StageState; import org.apache.tajo.service.ServiceTrackerFactory; +import org.apache.tajo.storage.FileTablespace; +import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.Pair; import org.apache.tajo.worker.TajoWorker; import java.io.File; import java.io.IOException; import java.io.Writer; import java.net.InetSocketAddress; -import java.net.URISyntaxException; import java.sql.ResultSet; import java.util.ArrayList; import java.util.List; @@ -345,10 +346,18 @@ public class TajoTestingCluster { LOG.info("derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI)); if (!local) { - c.setVar(ConfVars.ROOT_DIR, - getMiniDFSCluster().getFileSystem().getUri() + "/tajo"); + c.setVar(ConfVars.ROOT_DIR, getMiniDFSCluster().getFileSystem().getUri() + "/tajo"); } else { - c.setVar(ConfVars.ROOT_DIR, testBuildDir.getAbsolutePath() + "/tajo"); + c.setVar(ConfVars.ROOT_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo"); + } + + // Do not need for local file system + if (!local) { + FileTablespace defaultTableSpace = + new FileTablespace(TableSpaceManager.DEFAULT_TABLESPACE_NAME, TajoConf.getWarehouseDir(c).toUri()); + defaultTableSpace.init(conf); + + TableSpaceManager.addTableSpaceForTest(defaultTableSpace); } setupCatalogForTesting(c, testBuildDir); @@ -441,13 +450,6 @@ public class TajoTestingCluster { } } - public void restartTajoCluster(int numSlaves) throws Exception { - tajoMaster.stop(); - tajoMaster.start(); - - LOG.info("Minicluster has been restarted"); - } - public TajoMaster getMaster() { return this.tajoMaster; } @@ -653,7 +655,14 @@ public class TajoTestingCluster { if (!fs.exists(rootDir)) { fs.mkdirs(rootDir); } - Path tablePath = new Path(rootDir, tableName); + Path tablePath; + if (CatalogUtil.isFQTableName(tableName)) { + Pair<String, String> name = CatalogUtil.separateQualifierAndName(tableName); + tablePath = new Path(rootDir, new Path(name.getFirst(), name.getSecond())); + } else { + tablePath = new Path(rootDir, tableName); + } + fs.mkdirs(tablePath); if (tableDatas.length > 0) { int recordPerFile = tableDatas.length / numDataFiles; http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java index 54e50fc..ce951c6 100644 --- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java +++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java @@ -32,6 +32,7 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.client.QueryStatus; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.StorageUtil; +import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.util.FileUtil; import org.junit.After; import org.junit.Before; @@ -214,10 +215,9 @@ public class TestTajoCli { String consoleResult = new String(out.toByteArray()); - FileSystem fs = FileSystem.get(testBase.getTestingCluster().getConfiguration()); if (!cluster.isHiveCatalogStoreRunning()) { assertOutputResult(resultFileName, consoleResult, new String[]{"${table.path}"}, - new String[]{fs.getUri() + "/tajo/warehouse/default/" + tableName}); + new String[]{TableSpaceManager.getDefault().getTableUri("default", tableName).toString()}); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java index 765a084..73b97fa 100644 --- a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java +++ b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java @@ -324,7 +324,7 @@ public class TestTajoClient { client.updateQuery(sql); assertTrue(client.existTable(tableName)); - Path tablePath = new Path(client.getTableDesc(tableName).getPath()); + Path tablePath = new Path(client.getTableDesc(tableName).getUri()); FileSystem hdfs = tablePath.getFileSystem(conf); assertTrue(hdfs.exists(tablePath)); @@ -345,7 +345,7 @@ public class TestTajoClient { client.updateQuery(sql); assertTrue(client.existTable(tableName)); - Path tablePath = new Path(client.getTableDesc(tableName).getPath()); + Path tablePath = new Path(client.getTableDesc(tableName).getUri()); FileSystem hdfs = tablePath.getFileSystem(conf); assertTrue(hdfs.exists(tablePath)); @@ -422,7 +422,7 @@ public class TestTajoClient { client.updateQuery(sql); assertTrue(client.existTable(tableName)); - Path tablePath = new Path(client.getTableDesc(tableName).getPath()); + Path tablePath = new Path(client.getTableDesc(tableName).getUri()); FileSystem hdfs = tablePath.getFileSystem(conf); assertTrue(hdfs.exists(tablePath)); @@ -446,7 +446,7 @@ public class TestTajoClient { client.updateQuery(sql); assertTrue(client.existTable(tableName)); - Path tablePath = new Path(client.getTableDesc(tableName).getPath()); + Path tablePath = new Path(client.getTableDesc(tableName).getUri()); FileSystem hdfs = tablePath.getFileSystem(conf); assertTrue(hdfs.exists(tablePath)); @@ -470,7 +470,7 @@ public class TestTajoClient { client.updateQuery(sql); assertTrue(client.existTable(tableName)); - Path tablePath = new Path(client.getTableDesc(tableName).getPath()); + Path tablePath = new Path(client.getTableDesc(tableName).getUri()); FileSystem hdfs = tablePath.getFileSystem(conf); assertTrue(hdfs.exists(tablePath)); @@ -495,7 +495,7 @@ public class TestTajoClient { client.updateQuery(sql); assertTrue(client.existTable(tableName)); - Path tablePath = new Path(client.getTableDesc(tableName).getPath()); + Path tablePath = new Path(client.getTableDesc(tableName).getUri()); FileSystem hdfs = tablePath.getFileSystem(conf); assertTrue(hdfs.exists(tablePath)); @@ -521,7 +521,7 @@ public class TestTajoClient { client.updateQuery(sql); assertTrue(client.existTable(tableName)); - Path tablePath = new Path(client.getTableDesc(tableName).getPath()); + Path tablePath = new Path(client.getTableDesc(tableName).getUri()); FileSystem hdfs = tablePath.getFileSystem(conf); assertTrue(hdfs.exists(tablePath)); @@ -574,7 +574,7 @@ public class TestTajoClient { client.updateQuery(sql); assertTrue(client.existTable(tableName)); - Path tablePath = new Path(client.getTableDesc(tableName).getPath()); + Path tablePath = new Path(client.getTableDesc(tableName).getUri()); FileSystem hdfs = tablePath.getFileSystem(conf); assertTrue(hdfs.exists(tablePath)); @@ -704,7 +704,7 @@ public class TestTajoClient { assertEquals(resultDesc.getMeta().getOption(StorageConstants.TEXT_NULL), "\\\\T"); - Path path = new Path(resultDesc.getPath()); + Path path = new Path(resultDesc.getUri()); FileSystem fs = path.getFileSystem(tajoConf); FileStatus[] files = fs.listStatus(path); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java index 66aedc0..328f883 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java @@ -50,6 +50,7 @@ import org.apache.tajo.plan.verifier.LogicalPlanVerifier; import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier; import org.apache.tajo.plan.verifier.VerificationState; import org.apache.tajo.storage.LazyTuple; +import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; import org.apache.tajo.util.BytesUtils; @@ -103,7 +104,7 @@ public class ExprTestBase { analyzer = new SQLAnalyzer(); preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat); - planner = new LogicalPlanner(cat); + planner = new LogicalPlanner(cat, TableSpaceManager.getInstance()); optimizer = new LogicalOptimizer(util.getConfiguration()); annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat); } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java index 4bfe640..80f3459 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java @@ -45,6 +45,7 @@ import org.apache.tajo.plan.function.GeneralFunction; import org.apache.tajo.plan.logical.GroupbyNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.nameresolver.NameResolvingMode; +import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.storage.Tuple; import org.apache.tajo.util.CommonTestingUtil; import org.junit.AfterClass; @@ -116,7 +117,7 @@ public class TestEvalTreeUtil { catalog.createFunction(funcMeta); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog); + planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); String[] QUERIES = { "select name, score, age from people where score > 30", // 0 http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java index a408fd6..9aa7ddf 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java @@ -24,7 +24,6 @@ import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.engine.function.FunctionLoader; import org.apache.tajo.engine.function.builtin.SumInt; @@ -35,6 +34,7 @@ import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.junit.AfterClass; @@ -103,7 +103,7 @@ public class TestLogicalOptimizer { catalog.createFunction(funcDesc); sqlAnalyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog); + planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); optimizer = new LogicalOptimizer(util.getConfiguration()); defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java index cee1593..3cee816 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java @@ -19,6 +19,7 @@ package org.apache.tajo.engine.planner; import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.util.graph.SimpleDirectedGraph; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; @@ -39,7 +40,7 @@ public class TestLogicalPlan { public static void setup() throws Exception { util = new TajoTestingCluster(); util.startCatalogCluster(); - planner = new LogicalPlanner(util.getMiniCatalogCluster().getCatalog()); + planner = new LogicalPlanner(util.getMiniCatalogCluster().getCatalog(), TableSpaceManager.getInstance()); } public static void tearDown() { http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java index 1feea4c..351a6af 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.QueryVars; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.algebra.Expr; @@ -29,7 +30,6 @@ import org.apache.tajo.algebra.JoinType; import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.datum.TextDatum; import org.apache.tajo.engine.function.FunctionLoader; @@ -42,6 +42,7 @@ import org.apache.tajo.plan.*; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.KeyValueSet; @@ -130,7 +131,7 @@ public class TestLogicalPlanner { catalog.createFunction(funcDesc); sqlAnalyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog); + planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); } @AfterClass @@ -155,6 +156,13 @@ public class TestLogicalPlanner { "select length(name), length(deptname), *, empid+10 from employee where empId > 500", // 13 }; + private static QueryContext createQueryContext() { + QueryContext qc = new QueryContext(util.getConfiguration(), session); + qc.put(QueryVars.DEFAULT_SPACE_URI, "file:/"); + qc.put(QueryVars.DEFAULT_SPACE_ROOT_URI, "file:/"); + return qc; + } + public static final void testCloneLogicalNode(LogicalNode n1) throws CloneNotSupportedException { LogicalNode copy = (LogicalNode) n1.clone(); assertTrue(n1.deepEquals(copy)); @@ -162,7 +170,7 @@ public class TestLogicalPlanner { @Test public final void testSingleRelation() throws CloneNotSupportedException, PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(QUERIES[0]); LogicalPlan planNode = planner.createPlan(qc, expr); @@ -196,7 +204,7 @@ public class TestLogicalPlanner { @Test public final void testImplicityJoinPlan() throws CloneNotSupportedException, PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); // two relations Expr expr = sqlAnalyzer.parse(QUERIES[1]); @@ -285,7 +293,7 @@ public class TestLogicalPlanner { @Test public final void testNaturalJoinPlan() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); // two relations Expr context = sqlAnalyzer.parse(JOINS[0]); LogicalNode plan = planner.createPlan(qc, context).getRootBlock().getRoot(); @@ -317,7 +325,7 @@ public class TestLogicalPlanner { @Test public final void testInnerJoinPlan() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); // two relations Expr expr = sqlAnalyzer.parse(JOINS[1]); LogicalPlan plan = planner.createPlan(qc, expr); @@ -350,7 +358,7 @@ public class TestLogicalPlanner { @Test public final void testOuterJoinPlan() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); // two relations Expr expr = sqlAnalyzer.parse(JOINS[2]); @@ -385,7 +393,7 @@ public class TestLogicalPlanner { @Test public final void testGroupby() throws CloneNotSupportedException, PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); // without 'having clause' Expr context = sqlAnalyzer.parse(QUERIES[7]); @@ -429,7 +437,7 @@ public class TestLogicalPlanner { public final void testMultipleJoin() throws IOException, PlanningException { Expr expr = sqlAnalyzer.parse( FileUtil.readTextFile(new File("src/test/resources/queries/TestJoinQuery/testTPCHQ2Join.sql"))); - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot(); testJsonSerDerObject(plan); Schema expected = tpch.getOutSchema("q2"); @@ -488,7 +496,7 @@ public class TestLogicalPlanner { Expr expr = sqlAnalyzer.parse( FileUtil.readTextFile(new File ("src/test/resources/queries/TestJoinQuery/testJoinWithMultipleJoinQual1.sql"))); - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); LogicalPlan plan = planner.createPlan(qc, expr); LogicalNode node = plan.getRootBlock().getRoot(); @@ -518,7 +526,7 @@ public class TestLogicalPlanner { } for (Map.Entry<BinaryEval, Boolean> entry : qualMap.entrySet()) { - if (!entry.getValue().booleanValue()) { + if (!entry.getValue()) { Preconditions.checkArgument(false, "JoinQual not found. -> required JoinQual:" + entry.getKey().toJson()); } @@ -530,7 +538,7 @@ public class TestLogicalPlanner { Expr expr = sqlAnalyzer.parse( FileUtil.readTextFile(new File ("src/test/resources/queries/TestJoinQuery/testJoinWithMultipleJoinQual2.sql"))); - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); LogicalPlan plan = planner.createPlan(qc,expr); LogicalNode node = plan.getRootBlock().getRoot(); @@ -559,7 +567,7 @@ public class TestLogicalPlanner { } for (Map.Entry<BinaryEval, Boolean> entry : qualMap.entrySet()) { - if (!entry.getValue().booleanValue()) { + if (!entry.getValue()) { Preconditions.checkArgument(false, "SelectionQual not found. -> required JoinQual:" + entry.getKey().toJson()); } @@ -571,7 +579,7 @@ public class TestLogicalPlanner { Expr expr = sqlAnalyzer.parse( FileUtil.readTextFile(new File ("src/test/resources/queries/TestJoinQuery/testJoinWithMultipleJoinQual3.sql"))); - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); LogicalPlan plan = planner.createPlan(qc, expr); LogicalNode node = plan.getRootBlock().getRoot(); @@ -605,7 +613,7 @@ public class TestLogicalPlanner { } for (Map.Entry<BinaryEval, Boolean> entry : qualMap.entrySet()) { - if (!entry.getValue().booleanValue()) { + if (!entry.getValue()) { Preconditions.checkArgument(false, "ScanQual not found. -> required JoinQual:" + entry.getKey().toJson()); } @@ -618,7 +626,7 @@ public class TestLogicalPlanner { Expr expr = sqlAnalyzer.parse( FileUtil.readTextFile(new File ("src/test/resources/queries/TestJoinQuery/testJoinWithMultipleJoinQual4.sql"))); - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); LogicalPlan plan = planner.createPlan(qc, expr); LogicalNode node = plan.getRootBlock().getRoot(); @@ -675,14 +683,14 @@ public class TestLogicalPlanner { for (Map.Entry<BinaryEval, Boolean> entry : joinQualMap.entrySet()) { - if (!entry.getValue().booleanValue()) { + if (!entry.getValue()) { Preconditions.checkArgument(false, "JoinQual not found. -> required JoinQual:" + entry.getKey().toJson()); } } for (Map.Entry<BinaryEval, Boolean> entry : scanMap.entrySet()) { - if (!entry.getValue().booleanValue()) { + if (!entry.getValue()) { Preconditions.checkArgument(false, "ScanQual not found. -> required JoinQual:" + entry.getKey().toJson()); } @@ -709,7 +717,7 @@ public class TestLogicalPlanner { @Test public final void testStoreTable() throws CloneNotSupportedException, PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr context = sqlAnalyzer.parse(QUERIES[8]); @@ -727,7 +735,7 @@ public class TestLogicalPlanner { @Test public final void testOrderBy() throws CloneNotSupportedException, PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(QUERIES[4]); @@ -757,7 +765,7 @@ public class TestLogicalPlanner { @Test public final void testLimit() throws CloneNotSupportedException, PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(QUERIES[12]); @@ -779,7 +787,7 @@ public class TestLogicalPlanner { @Test public final void testSPJPush() throws CloneNotSupportedException, PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(QUERIES[5]); LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot(); @@ -801,7 +809,7 @@ public class TestLogicalPlanner { @Test public final void testSPJ() throws CloneNotSupportedException, PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(QUERIES[6]); LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot(); @@ -811,7 +819,7 @@ public class TestLogicalPlanner { @Test public final void testJson() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(QUERIES[9]); LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot(); @@ -833,7 +841,7 @@ public class TestLogicalPlanner { @Test public final void testVisitor() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); // two relations Expr expr = sqlAnalyzer.parse(QUERIES[1]); @@ -860,7 +868,7 @@ public class TestLogicalPlanner { @Test public final void testExprNode() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(QUERIES[10]); LogicalPlan rootNode = planner.createPlan(qc, expr); @@ -882,7 +890,7 @@ public class TestLogicalPlanner { @Test public final void testAsterisk() throws CloneNotSupportedException, PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(QUERIES[13]); LogicalPlan planNode = planner.createPlan(qc, expr); @@ -912,7 +920,7 @@ public class TestLogicalPlanner { @Test public final void testAlias1() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(ALIAS[0]); LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot(); @@ -940,7 +948,7 @@ public class TestLogicalPlanner { @Test public final void testAlias2() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(ALIAS[1]); LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot(); @@ -961,7 +969,7 @@ public class TestLogicalPlanner { @Test public final void testCreateTableDef() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(CREATE_TABLE[0]); LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot(); @@ -980,7 +988,7 @@ public class TestLogicalPlanner { assertEquals("score", def.getColumn(3).getSimpleName()); assertEquals(Type.FLOAT4, def.getColumn(3).getDataType().getType()); assertTrue("CSV".equalsIgnoreCase(createTable.getStorageType())); - assertEquals("/tmp/data", createTable.getPath().toString()); + assertEquals("/tmp/data", createTable.getUri().toString()); assertTrue(createTable.hasOptions()); assertEquals("|", createTable.getOptions().get("csv.delimiter")); } @@ -1047,7 +1055,7 @@ public class TestLogicalPlanner { @Test public final void testSetPlan() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(setStatements[0]); LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot(); @@ -1068,7 +1076,7 @@ public class TestLogicalPlanner { @Test public void testSetQualifier() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr context = sqlAnalyzer.parse(setQualifiers[0]); LogicalNode plan = planner.createPlan(qc, context).getRootBlock().getRoot(); @@ -1121,7 +1129,7 @@ public class TestLogicalPlanner { @Test public final void testInsertInto0() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(insertStatements[0]); LogicalPlan plan = planner.createPlan(qc, expr); @@ -1134,7 +1142,7 @@ public class TestLogicalPlanner { @Test public final void testInsertInto1() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(insertStatements[1]); LogicalPlan plan = planner.createPlan(qc, expr); @@ -1146,7 +1154,7 @@ public class TestLogicalPlanner { @Test public final void testInsertInto2() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(insertStatements[2]); LogicalPlan plan = planner.createPlan(qc, expr); @@ -1161,19 +1169,19 @@ public class TestLogicalPlanner { @Test public final void testInsertInto3() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(insertStatements[3]); LogicalPlan plan = planner.createPlan(qc, expr); assertEquals(1, plan.getQueryBlocks().size()); InsertNode insertNode = getInsertNode(plan); assertFalse(insertNode.isOverwrite()); - assertTrue(insertNode.hasPath()); + assertTrue(insertNode.hasUri()); } @Test public final void testInsertInto4() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(insertStatements[4]); LogicalPlan plan = planner.createPlan(qc, expr); @@ -1189,19 +1197,19 @@ public class TestLogicalPlanner { @Test public final void testInsertInto5() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(insertStatements[5]); LogicalPlan plan = planner.createPlan(qc, expr); assertEquals(1, plan.getQueryBlocks().size()); InsertNode insertNode = getInsertNode(plan); assertTrue(insertNode.isOverwrite()); - assertTrue(insertNode.hasPath()); + assertTrue(insertNode.hasUri()); } @Test public final void testInsertInto6() throws PlanningException { - QueryContext qc = new QueryContext(util.getConfiguration(), session); + QueryContext qc = createQueryContext(); Expr expr = sqlAnalyzer.parse(insertStatements[6]); LogicalPlan plan = planner.createPlan(qc, expr); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java index 0082800..d62eed2 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java @@ -39,6 +39,7 @@ import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; @@ -108,7 +109,7 @@ public class TestPlannerUtil { catalog.createFunction(funcDesc); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog); + planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); } @AfterClass @@ -337,7 +338,7 @@ public class TestPlannerUtil { TableDesc tableDesc = new TableDesc(); tableDesc.setName("Test"); - tableDesc.setPath(path.toUri()); + tableDesc.setUri(path.toUri()); FileSystem fs = path.getFileSystem(util.getConfiguration()); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java index e7e4f7d..2464fb1 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java @@ -86,7 +86,7 @@ public class TestBNLJoinExec { TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV"); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)) + Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()) .getAppender(employeeMeta, schema, employeePath); appender.init(); VTuple tuple = new VTuple(schema.size()); @@ -108,8 +108,7 @@ public class TestBNLJoinExec { peopleSchema.addColumn("age", Type.INT4); TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV"); Path peoplePath = new Path(testDir, "people.csv"); - appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)) - .getAppender(peopleMeta, peopleSchema, peoplePath); + appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(peopleMeta, peopleSchema, peoplePath); appender.init(); tuple = new VTuple(peopleSchema.size()); for (int i = 1; i < INNER_TUPLE_NUM; i += 2) { @@ -125,7 +124,7 @@ public class TestBNLJoinExec { people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); catalog.createTable(people); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog); + planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); } @After @@ -150,10 +149,10 @@ public class TestBNLJoinExec { enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN); FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(), - new Path(employee.getPath()), + new Path(employee.getUri()), Integer.MAX_VALUE); FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(), - new Path(people.getPath()), + new Path(people.getUri()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testBNLCrossJoin"); @@ -183,9 +182,9 @@ public class TestBNLJoinExec { context).getRootBlock().getRoot(); FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(), - new Path(employee.getPath()), Integer.MAX_VALUE); + new Path(employee.getUri()), Integer.MAX_VALUE); FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(), - new Path(people.getPath()), Integer.MAX_VALUE); + new Path(people.getUri()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java index 5a7ba6a..96a1f36 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java @@ -90,7 +90,7 @@ public class TestBSTIndexExec { Path workDir = CommonTestingUtil.getTestDir(); catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString()); catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); - sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); + sm = TableSpaceManager.getLocalFs(); idxPath = new Path(workDir, "test.idx"); @@ -144,11 +144,11 @@ public class TestBSTIndexExec { TableDesc desc = new TableDesc( CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta, - sm.getTablePath("employee").toUri()); + sm.getTableUri(TajoConstants.DEFAULT_DATABASE_NAME, "employee")); catalog.createTable(desc); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog); + planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); optimizer = new LogicalOptimizer(conf); } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java index 8e2f234..d94d3f6 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java @@ -82,7 +82,7 @@ public class TestExternalSortExec { TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV"); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)) + Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()) .getAppender(employeeMeta, schema, employeePath); appender.enableStats(); appender.init(); @@ -104,7 +104,7 @@ public class TestExternalSortExec { employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri()); catalog.createTable(employee); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog); + planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); } @After @@ -120,7 +120,7 @@ public class TestExternalSortExec { @Test public final void testNext() throws IOException, PlanningException { FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(), - new Path(employee.getPath()), Integer.MAX_VALUE); + new Path(employee.getUri()), Integer.MAX_VALUE); Path workDir = new Path(testDir, TestExternalSortExec.class.getName()); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java index fb6fd02..21a101a 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java @@ -104,8 +104,7 @@ public class TestFullOuterHashJoinExec { TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV"); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)) - .getAppender(dep3Meta, dep3Schema, dep3Path); + Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); VTuple tuple = new VTuple(dep3Schema.size()); for (int i = 0; i < 10; i++) { @@ -134,8 +133,7 @@ public class TestFullOuterHashJoinExec { TableMeta job3Meta = CatalogUtil.newTableMeta("CSV"); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)) - .getAppender(job3Meta, job3Schema, job3Path); + Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path); appender2.init(); VTuple tuple2 = new VTuple(job3Schema.size()); for (int i = 1; i < 4; i++) { @@ -174,8 +172,7 @@ public class TestFullOuterHashJoinExec { TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV"); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)) - .getAppender(emp3Meta, emp3Schema, emp3Path); + Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); VTuple tuple3 = new VTuple(emp3Schema.size()); @@ -227,7 +224,7 @@ public class TestFullOuterHashJoinExec { TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV"); Path phone3Path = new Path(testDir, "phone3.csv"); - Appender appender5 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)) + Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs()) .getAppender(phone3Meta, phone3Schema, phone3Path); appender5.init(); @@ -237,7 +234,7 @@ public class TestFullOuterHashJoinExec { catalog.createTable(phone3); analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog); + planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance()); defaultContext = LocalTajoTestingUtility.createDummyContext(conf); } @@ -266,9 +263,9 @@ public class TestFullOuterHashJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); - FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), + FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getUri()), Integer.MAX_VALUE); - FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags); @@ -305,9 +302,9 @@ public class TestFullOuterHashJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); - FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), + FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()), Integer.MAX_VALUE); - FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); @@ -343,9 +340,9 @@ public class TestFullOuterHashJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); - FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE); - FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), + FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags); @@ -382,9 +379,9 @@ public class TestFullOuterHashJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); - FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), Integer.MAX_VALUE); - FileFragment[] phone3Frags = FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), + FileFragment[] phone3Frags = FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getUri()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
