http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java index e4b98d4..406550d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java @@ -21,6 +21,7 @@ package org.apache.tajo.master; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.util.RackResolver; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.NetUtils; import java.util.*; @@ -81,9 +82,12 @@ public class DefaultFragmentScheduleAlgorithm implements FragmentScheduleAlgorit @Override public void addFragment(FragmentPair fragmentPair) { String[] hosts = fragmentPair.getLeftFragment().getHosts(); - int[] diskIds = fragmentPair.getLeftFragment().getDiskIds(); + int[] diskIds = null; + if (fragmentPair.getLeftFragment() instanceof FileFragment) { + diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds(); + } for (int i = 0; i < hosts.length; i++) { - addFragment(hosts[i], diskIds[i], fragmentPair); + addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair); } fragmentNum++; }
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index 77e3257..01137aa 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -44,6 +44,7 @@ import org.apache.tajo.master.querymaster.SubQuery; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.NetUtils; import org.apache.tajo.worker.FetchImpl; @@ -150,8 +151,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { super.stop(); } - private FileFragment[] fragmentsForNonLeafTask; - private FileFragment[] broadcastFragmentsForNonLeafTask; + private Fragment[] fragmentsForNonLeafTask; + private Fragment[] broadcastFragmentsForNonLeafTask; LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>(); public void schedule() { http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java index 598b1c5..827386b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java @@ -19,7 +19,7 @@ package org.apache.tajo.master; import com.google.common.base.Objects; -import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; /** * FragmentPair consists of two fragments, a left fragment and a right fragment. @@ -29,23 +29,23 @@ import org.apache.tajo.storage.fragment.FileFragment; * For other queries, it is assumed to have only a left fragment. */ public class FragmentPair { - private FileFragment leftFragment; - private FileFragment rightFragment; + private Fragment leftFragment; + private Fragment rightFragment; - public FragmentPair(FileFragment left) { + public FragmentPair(Fragment left) { this.leftFragment = left; } - public FragmentPair(FileFragment left, FileFragment right) { + public FragmentPair(Fragment left, Fragment right) { this.leftFragment = left; this.rightFragment = right; } - public FileFragment getLeftFragment() { + public Fragment getLeftFragment() { return leftFragment; } - public FileFragment getRightFragment() { + public Fragment getRightFragment() { return rightFragment; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 399644c..a9624f8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -40,6 +40,7 @@ import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.exception.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; @@ -336,6 +337,15 @@ public class GlobalEngine extends AbstractService { responseBuilder.setResultCode(ClientProtos.ResultCode.OK); } } else { // it requires distributed execution. So, the query is forwarded to a query master. + StoreType storeType = PlannerUtil.getStoreType(plan); + if (storeType != null) { + StorageManager sm = StorageManager.getStorageManager(context.getConf(), storeType); + StorageProperty storageProperty = sm.getStorageProperty(); + if (!storageProperty.isSupportsInsertInto()) { + throw new VerifyException("Inserting into non-file storage is not supported."); + } + sm.beforeInsertOrCATS(rootNode.getChild()); + } context.getSystemMetrics().counter("Query", "numDMLQuery").inc(); hookManager.doHooks(queryContext, plan); @@ -348,6 +358,7 @@ public class GlobalEngine extends AbstractService { responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR); responseBuilder.setErrorMessage("Fail starting QueryMaster."); + LOG.error("Fail starting QueryMaster: " + sql); } else { responseBuilder.setIsForwarded(true); responseBuilder.setQueryId(queryInfo.getQueryId().getProto()); @@ -356,7 +367,8 @@ public class GlobalEngine extends AbstractService { responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost()); } responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort()); - LOG.info("Query is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort()); + LOG.info("Query " + queryInfo.getQueryId().toString() + "," + queryInfo.getSql() + "," + + " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort()); } } @@ -556,6 +568,7 @@ public class GlobalEngine extends AbstractService { LOG.info("============================================="); annotatedPlanVerifier.verify(queryContext, state, plan); + verifyInsertTableSchema(queryContext, state, plan); if (!state.verified()) { StringBuilder sb = new StringBuilder(); @@ -568,6 +581,25 @@ public class GlobalEngine extends AbstractService { return plan; } + private void verifyInsertTableSchema(QueryContext queryContext, VerificationState state, LogicalPlan plan) { + StoreType storeType = PlannerUtil.getStoreType(plan); + if (storeType != null) { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + if (rootNode.getChild().getType() == NodeType.INSERT) { + try { + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); + InsertNode iNode = rootNode.getChild(); + Schema outSchema = iNode.getChild().getOutSchema(); + + StorageManager.getStorageManager(queryContext.getConf(), storeType) + .verifyInsertTableSchema(tableDesc, outSchema); + } catch (Throwable t) { + state.addVerification(t.getMessage()); + } + } + } + } + /** * Alter a given table */ @@ -730,32 +762,18 @@ public class GlobalEngine extends AbstractService { meta = CatalogUtil.newTableMeta(createTable.getStorageType()); } - if(createTable.isExternal()){ + if(PlannerUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){ Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given."); - } else { - String databaseName; - String tableName; - if (CatalogUtil.isFQTableName(createTable.getTableName())) { - databaseName = CatalogUtil.extractQualifier(createTable.getTableName()); - tableName = CatalogUtil.extractSimpleName(createTable.getTableName()); - } else { - databaseName = queryContext.getCurrentDatabase(); - tableName = createTable.getTableName(); - } - - // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} ) - Path tablePath = StorageUtil.concatPath(sm.getWarehouseDir(), databaseName, tableName); - createTable.setPath(tablePath); } - return createTableOnPath(queryContext, createTable.getTableName(), createTable.getTableSchema(), - meta, createTable.getPath(), createTable.isExternal(), createTable.getPartitionMethod(), ifNotExists); + return createTable(queryContext, createTable.getTableName(), createTable.getStorageType(), + createTable.getTableSchema(), meta, createTable.getPath(), createTable.isExternal(), + createTable.getPartitionMethod(), ifNotExists); } - public TableDesc createTableOnPath(QueryContext queryContext, String tableName, Schema schema, TableMeta meta, - Path path, boolean isExternal, PartitionMethodDesc partitionDesc, - boolean ifNotExists) - throws IOException { + public TableDesc createTable(QueryContext queryContext, String tableName, StoreType storeType, + Schema schema, TableMeta meta, Path path, boolean isExternal, + PartitionMethodDesc partitionDesc, boolean ifNotExists) throws IOException { String databaseName; String simpleTableName; if (CatalogUtil.isFQTableName(tableName)) { @@ -779,39 +797,15 @@ public class GlobalEngine extends AbstractService { } } - FileSystem fs = path.getFileSystem(context.getConf()); - - if (isExternal) { - if(!fs.exists(path)) { - LOG.error("ERROR: " + path.toUri() + " does not exist"); - throw new IOException("ERROR: " + path.toUri() + " does not exist"); - } - } else { - fs.mkdirs(path); - } - - long totalSize = 0; - - try { - totalSize = sm.calculateSize(path); - } catch (IOException e) { - LOG.warn("Cannot calculate the size of the relation", e); - } - - TableStats stats = new TableStats(); - stats.setNumBytes(totalSize); - - if (isExternal) { // if it is an external table, there is no way to know the exact row number without processing. - stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); - } - TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName), - schema, meta, path.toUri(), isExternal); - desc.setStats(stats); + schema, meta, (path != null ? path.toUri(): null), isExternal); + if (partitionDesc != null) { desc.setPartitionMethod(partitionDesc); } + StorageManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc, ifNotExists); + if (catalog.createTable(desc)) { LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")"); return desc; @@ -905,13 +899,13 @@ public class GlobalEngine extends AbstractService { } } - Path path = new Path(catalog.getTableDesc(qualifiedName).getPath()); + TableDesc tableDesc = catalog.getTableDesc(qualifiedName); catalog.dropTable(qualifiedName); if (purge) { try { - FileSystem fs = path.getFileSystem(context.getConf()); - fs.delete(path, true); + StorageManager.getStorageManager(queryContext.getConf(), + tableDesc.getMeta().getStoreType()).purgeTable(tableDesc); } catch (IOException e) { throw new InternalError(e.getMessage()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java index 82fd6fc..56cf8e5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.util.RackResolver; import org.apache.tajo.master.DefaultFragmentScheduleAlgorithm.FragmentsPerDisk; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.TUtil; @@ -101,9 +102,12 @@ public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorith @Override public void addFragment(FragmentPair fragmentPair) { String[] hosts = fragmentPair.getLeftFragment().getHosts(); - int[] diskIds = fragmentPair.getLeftFragment().getDiskIds(); + int[] diskIds = null; + if (fragmentPair.getLeftFragment() instanceof FileFragment) { + diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds(); + } for (int i = 0; i < hosts.length; i++) { - addFragment(hosts[i], diskIds[i], fragmentPair); + addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair); } totalFragmentNum++; } @@ -276,23 +280,27 @@ public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorith public void removeFragment(FragmentPair fragmentPair) { String [] hosts = fragmentPair.getLeftFragment().getHosts(); - int[] diskIds = fragmentPair.getLeftFragment().getDiskIds(); + int[] diskIds = null; + if (fragmentPair.getLeftFragment() instanceof FileFragment) { + diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds(); + } for (int i = 0; i < hosts.length; i++) { + int diskId = diskIds == null ? -1 : diskIds[i]; String normalizedHost = NetUtils.normalizeHost(hosts[i]); Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost); if (diskFragmentMap != null) { - FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskIds[i]); + FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskId); if (fragmentsPerDisk != null) { boolean isRemoved = fragmentsPerDisk.removeFragmentPair(fragmentPair); if (isRemoved) { if (fragmentsPerDisk.size() == 0) { - diskFragmentMap.remove(diskIds[i]); + diskFragmentMap.remove(diskId); if (diskFragmentMap.size() == 0) { fragmentHostMapping.remove(normalizedHost); } } - HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskIds[i]); + HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskId); if (totalHostPriority.containsKey(hostAndDisk)) { PrioritizedHost prioritizedHost = totalHostPriority.get(hostAndDisk); updateHostPriority(prioritizedHost.hostAndDisk, prioritizedHost.priority-1); http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java index b2883cc..cc99453 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; @@ -38,7 +39,9 @@ import org.apache.tajo.master.querymaster.QueryUnit; import org.apache.tajo.master.querymaster.QueryUnitAttempt; import org.apache.tajo.master.querymaster.SubQuery; import org.apache.tajo.master.container.TajoContainerId; +import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.NetUtils; import org.apache.tajo.worker.FetchImpl; @@ -197,15 +200,17 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { if (event.getType() == EventType.T_SCHEDULE) { if (event instanceof FragmentScheduleEvent) { FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event; - Collection<FileFragment> rightFragments = castEvent.getRightFragments(); + Collection<Fragment> rightFragments = castEvent.getRightFragments(); if (rightFragments == null || rightFragments.isEmpty()) { scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), null)); } else { - for (FileFragment eachFragment: rightFragments) { + for (Fragment eachFragment: rightFragments) { scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), eachFragment)); } } - initDiskBalancer(castEvent.getLeftFragment().getHosts(), castEvent.getLeftFragment().getDiskIds()); + if (castEvent.getLeftFragment() instanceof FileFragment) { + initDiskBalancer(castEvent.getLeftFragment().getHosts(), ((FileFragment)castEvent.getLeftFragment()).getDiskIds()); + } } else if (event instanceof FetchScheduleEvent) { FetchScheduleEvent castEvent = (FetchScheduleEvent) event; scheduledFetches.addFetch(castEvent.getFetches()); @@ -366,6 +371,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { long taskSize = adjustTaskSize(); LOG.info("Adjusted task size: " + taskSize); + TajoConf conf = subQuery.getContext().getConf(); // host local, disk local String normalized = NetUtils.normalizeHost(host); Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID); @@ -376,13 +382,14 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { break; } - if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) { + if (assignedFragmentSize + + StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) { break; } else { fragmentPairs.add(fragmentPair); - assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey(); + assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()); if (fragmentPair.getRightFragment() != null) { - assignedFragmentSize += fragmentPair.getRightFragment().getEndKey(); + assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment()); } } scheduledFragments.removeFragment(fragmentPair); @@ -398,13 +405,14 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { break; } - if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) { + if (assignedFragmentSize + + StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) { break; } else { fragmentPairs.add(fragmentPair); - assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey(); + assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()); if (fragmentPair.getRightFragment() != null) { - assignedFragmentSize += fragmentPair.getRightFragment().getEndKey(); + assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment()); } } scheduledFragments.removeFragment(fragmentPair); http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java index 768528d..64081f3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java @@ -26,7 +26,6 @@ import org.apache.tajo.QueryUnitId; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.planner.physical.PhysicalPlanUtil; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.engine.planner.physical.SeqScanExec; import org.apache.tajo.engine.query.QueryContext; @@ -34,6 +33,8 @@ import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -41,7 +42,7 @@ import java.util.ArrayList; import java.util.List; public class NonForwardQueryResultScanner { - private static final int MAX_FILE_NUM_PER_SCAN = 100; + private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100; private QueryId queryId; private String sessionId; @@ -54,7 +55,7 @@ public class NonForwardQueryResultScanner { private TajoConf tajoConf; private ScanNode scanNode; - private int currentFileIndex = 0; + private int currentFragmentIndex = 0; public NonForwardQueryResultScanner(TajoConf tajoConf, String sessionId, QueryId queryId, @@ -76,23 +77,24 @@ public class NonForwardQueryResultScanner { } private void initSeqScanExec() throws IOException { - FragmentProto[] fragments = PhysicalPlanUtil.getNonZeroLengthDataFiles(tajoConf, tableDesc, - currentFileIndex, MAX_FILE_NUM_PER_SCAN); - if (fragments != null && fragments.length > 0) { + List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType()) + .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); + + if (fragments != null && !fragments.isEmpty()) { + FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[]{})); this.taskContext = new TaskAttemptContext( new QueryContext(tajoConf), null, new QueryUnitAttemptId(new QueryUnitId(new ExecutionBlockId(queryId, 1), 0), 0), - fragments, null); + fragmentProtos, null); try { // scanNode must be clone cause SeqScanExec change target in the case of a partitioned table. - scanExec = new SeqScanExec(taskContext, - StorageManager.getStorageManager(tajoConf), (ScanNode)scanNode.clone(), fragments); + scanExec = new SeqScanExec(taskContext, (ScanNode)scanNode.clone(), fragmentProtos); } catch (CloneNotSupportedException e) { throw new IOException(e.getMessage(), e); } scanExec.init(); - currentFileIndex += fragments.length; + currentFragmentIndex += fragments.size(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 604cfe0..f307127 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -53,6 +53,7 @@ import org.apache.tajo.rule.EvaluationContext; import org.apache.tajo.rule.EvaluationFailedException; import org.apache.tajo.rule.SelfDiagnosisRuleEngine; import org.apache.tajo.rule.SelfDiagnosisRuleSession; +import org.apache.tajo.storage.FileStorageManager; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.util.*; import org.apache.tajo.util.history.HistoryReader; @@ -110,7 +111,7 @@ public class TajoMaster extends CompositeService { private CatalogServer catalogServer; private CatalogService catalog; - private StorageManager storeManager; + private FileStorageManager storeManager; private GlobalEngine globalEngine; private AsyncDispatcher dispatcher; private TajoMasterClientService tajoMasterClientService; @@ -171,7 +172,7 @@ public class TajoMaster extends CompositeService { // check the system directory and create if they are not created. checkAndInitializeSystemDirectories(); diagnoseTajoMaster(); - this.storeManager = StorageManager.getStorageManager(systemConf); + this.storeManager = (FileStorageManager)StorageManager.getFileStorageManager(systemConf, null); catalogServer = new CatalogServer(FunctionLoader.load()); addIfService(catalogServer); @@ -422,7 +423,7 @@ public class TajoMaster extends CompositeService { return this.catalogServer; } - public StorageManager getStorageManager() { + public FileStorageManager getStorageManager() { return this.storeManager; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index b420a65..7014034 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -814,7 +814,8 @@ public class TajoMasterClientService extends AbstractService { TableDesc desc; try { - desc = context.getGlobalEngine().createTableOnPath(queryContext, request.getName(), schema, + desc = context.getGlobalEngine().createTable(queryContext, request.getName(), + meta.getStoreType(), schema, meta, path, true, partitionDesc, false); } catch (Exception e) { return TableResponse.newBuilder() http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java index 8cc17cb..9a7cc76 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java @@ -19,23 +19,23 @@ package org.apache.tajo.master.event; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; import java.util.Collection; public class FragmentScheduleEvent extends TaskSchedulerEvent { - private final FileFragment leftFragment; - private final Collection<FileFragment> rightFragments; + private final Fragment leftFragment; + private final Collection<Fragment> rightFragments; public FragmentScheduleEvent(final EventType eventType, final ExecutionBlockId blockId, - final FileFragment fragment) { + final Fragment fragment) { this(eventType, blockId, fragment, null); } public FragmentScheduleEvent(final EventType eventType, final ExecutionBlockId blockId, - final FileFragment leftFragment, - final Collection<FileFragment> rightFragments) { + final Fragment leftFragment, + final Collection<Fragment> rightFragments) { super(eventType, blockId); this.leftFragment = leftFragment; this.rightFragments = rightFragments; @@ -45,11 +45,11 @@ public class FragmentScheduleEvent extends TaskSchedulerEvent { return this.rightFragments != null && !this.rightFragments.isEmpty(); } - public FileFragment getLeftFragment() { + public Fragment getLeftFragment() { return leftFragment; } - public Collection<FileFragment> getRightFragments() { return rightFragments; } + public Collection<Fragment> getRightFragments() { return rightFragments; } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index f92001f..a048780 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -23,7 +23,6 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.event.EventHandler; @@ -32,33 +31,28 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; import org.apache.tajo.SessionVars; -import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.plan.logical.CreateTableNode; -import org.apache.tajo.plan.logical.InsertNode; -import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.*; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.event.*; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageConstants; -import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.QueryHistory; import org.apache.tajo.util.history.SubQueryHistory; import java.io.IOException; -import java.text.NumberFormat; import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -74,7 +68,6 @@ public class Query implements EventHandler<QueryEvent> { private Map<ExecutionBlockId, SubQuery> subqueries; private final EventHandler eventHandler; private final MasterPlan plan; - private final StorageManager sm; QueryMasterTask.QueryMasterTaskContext context; private ExecutionBlockCursor cursor; @@ -216,7 +209,6 @@ public class Query implements EventHandler<QueryEvent> { subqueries = Maps.newHashMap(); this.eventHandler = eventHandler; this.plan = plan; - this.sm = context.getStorageManager(); this.cursor = new ExecutionBlockCursor(plan, true); StringBuilder sb = new StringBuilder("\n======================================================="); @@ -398,7 +390,7 @@ public class Query implements EventHandler<QueryEvent> { query.setStartTime(); SubQuery subQuery = new SubQuery(query.context, query.getPlan(), - query.getExecutionBlockCursor().nextBlock(), query.sm); + query.getExecutionBlockCursor().nextBlock()); subQuery.setPriority(query.priority--); query.addSubQuery(subQuery); @@ -423,6 +415,20 @@ public class Query implements EventHandler<QueryEvent> { } else { finalState = QueryState.QUERY_ERROR; } + if (finalState != QueryState.QUERY_SUCCEEDED) { + SubQuery lastStage = query.getSubQuery(subQueryEvent.getExecutionBlockId()); + if (lastStage != null && lastStage.getTableMeta() != null) { + StoreType storeType = lastStage.getTableMeta().getStoreType(); + if (storeType != null) { + LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); + try { + StorageManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild()); + } catch (IOException e) { + LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e); + } + } + } + } query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId())); query.setFinishTime(); @@ -430,356 +436,27 @@ public class Query implements EventHandler<QueryEvent> { } private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { - MasterPlan masterPlan = query.getPlan(); + SubQuery lastStage = query.getSubQuery(event.getExecutionBlockId()); + StoreType storeType = lastStage.getTableMeta().getStoreType(); + try { + LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); + CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); - ExecutionBlock terminal = query.getPlan().getTerminalBlock(); - DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId()); + Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType) + .commitOutputData(query.context.getQueryContext(), + lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc); - QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); - try { - Path finalOutputDir = commitOutputData(query); + QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir); - } catch (Throwable t) { - query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(t))); + } catch (Exception e) { + query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); return QueryState.QUERY_ERROR; } return QueryState.QUERY_SUCCEEDED; } - /** - * It moves a result data stored in a staging output dir into a final output dir. - */ - public Path commitOutputData(Query query) throws IOException { - QueryContext queryContext = query.context.getQueryContext(); - Path stagingResultDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME); - Path finalOutputDir; - if (queryContext.hasOutputPath()) { - finalOutputDir = queryContext.getOutputPath(); - try { - FileSystem fs = stagingResultDir.getFileSystem(query.systemConf); - - if (queryContext.isOutputOverwrite()) { // INSERT OVERWRITE INTO - - // It moves the original table into the temporary location. - // Then it moves the new result table into the original table location. - // Upon failed, it recovers the original table if possible. - boolean movedToOldTable = false; - boolean committed = false; - Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); - ContentSummary summary = fs.getContentSummary(stagingResultDir); - - if (queryContext.hasPartition() && summary.getFileCount() > 0L) { - // This is a map for existing non-leaf directory to rename. A key is current directory and a value is - // renaming directory. - Map<Path, Path> renameDirs = TUtil.newHashMap(); - // This is a map for recovering existing partition directory. A key is current directory and a value is - // temporary directory to back up. - Map<Path, Path> recoveryDirs = TUtil.newHashMap(); - - try { - if (!fs.exists(finalOutputDir)) { - fs.mkdirs(finalOutputDir); - } - - visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), - renameDirs, oldTableDir); - - // Rename target partition directories - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - // Backup existing data files for recovering - if (fs.exists(entry.getValue())) { - String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), - oldTableDir.toString()); - Path recoveryPath = new Path(recoveryPathString); - fs.rename(entry.getValue(), recoveryPath); - fs.exists(recoveryPath); - recoveryDirs.put(entry.getValue(), recoveryPath); - } - // Delete existing directory - fs.delete(entry.getValue(), true); - // Rename staging directory to final output directory - fs.rename(entry.getKey(), entry.getValue()); - } - - } catch (IOException ioe) { - // Remove created dirs - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - fs.delete(entry.getValue(), true); - } - - // Recovery renamed dirs - for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { - fs.delete(entry.getValue(), true); - fs.rename(entry.getValue(), entry.getKey()); - } - - throw new IOException(ioe.getMessage()); - } - } else { // no partition - try { - - // if the final output dir exists, move all contents to the temporary table dir. - // Otherwise, just make the final output dir. As a result, the final output dir will be empty. - if (fs.exists(finalOutputDir)) { - fs.mkdirs(oldTableDir); - - for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { - fs.rename(status.getPath(), oldTableDir); - } - - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir); - } - - // Move the results to the final output dir. - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - - // Check the final output dir - committed = fs.exists(finalOutputDir); - - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { - - // if commit is failed, recover the old data - for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { - fs.delete(status.getPath(), true); - } - - for (FileStatus status : fs.listStatus(oldTableDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } - - throw new IOException(ioe.getMessage()); - } - } - } else { - NodeType queryType = queryContext.getCommandType(); - - if (queryType == NodeType.INSERT) { // INSERT INTO an existing table - - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); - - if (queryContext.hasPartition()) { - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.isFile()) { - LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1); - } - } else { - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++); - } - } - // checking all file moved and remove empty dir - verifyAllFileMoved(fs, stagingResultDir); - FileStatus[] files = fs.listStatus(stagingResultDir); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - } - } - } else { // CREATE TABLE AS SELECT (CTAS) - if (fs.exists(finalOutputDir)) { - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } else { - fs.rename(stagingResultDir, finalOutputDir); - } - LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); - } - } - - // remove the staging directory if the final output dir is given. - Path stagingDirRoot = queryContext.getStagingDir().getParent(); - fs.delete(stagingDirRoot, true); - - } catch (Throwable t) { - LOG.error(t); - throw new IOException(t); - } - } else { - finalOutputDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME); - } - - return finalOutputDir; - } - - /** - * This method sets a rename map which includes renamed staging directory to final output directory recursively. - * If there exists some data files, this delete it for duplicate data. - * - * - * @param fs - * @param stagingPath - * @param outputPath - * @param stagingParentPathString - * @throws IOException - */ - private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, - String stagingParentPathString, - Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - - for(FileStatus eachFile : files) { - if (eachFile.isDirectory()) { - Path oldPath = eachFile.getPath(); - - // Make recover directory. - String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, - oldTableDir.toString()); - Path recoveryPath = new Path(recoverPathString); - if (!fs.exists(recoveryPath)) { - fs.mkdirs(recoveryPath); - } - - visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, - renameDirs, oldTableDir); - // Find last order partition for renaming - String newPathString = oldPath.toString().replaceAll(stagingParentPathString, - outputPath.toString()); - Path newPath = new Path(newPathString); - if (!isLeafDirectory(fs, eachFile.getPath())) { - renameDirs.put(eachFile.getPath(), newPath); - } else { - if (!fs.exists(newPath)) { - fs.mkdirs(newPath); - } - } - } - } - } - - private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { - boolean retValue = false; - - FileStatus[] files = fs.listStatus(path); - for (FileStatus file : files) { - if (fs.isDirectory(file.getPath())) { - retValue = true; - break; - } - } - - return retValue; - } - - private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - if (eachFile.isFile()) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - return false; - } else { - if (verifyAllFileMoved(fs, eachFile.getPath())) { - fs.delete(eachFile.getPath(), false); - } else { - return false; - } - } - } - } - - return true; - } - - /** - * Attach the sequence number to a path. - * - * @param path Path - * @param seq sequence number - * @param nf Number format - * @return New path attached with sequence number - * @throws IOException - */ - private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { - String[] tokens = path.getName().split("-"); - if (tokens.length != 4) { - throw new IOException("Wrong result file name:" + path); - } - return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); - } - - /** - * Attach the sequence number to the output file name and than move the file into the final result path. - * - * @param fs FileSystem - * @param stagingResultDir The staging result dir - * @param fileStatus The file status - * @param finalOutputPath Final output path - * @param nf Number format - * @param fileSeq The sequence number - * @throws IOException - */ - private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, - FileStatus fileStatus, Path finalOutputPath, - NumberFormat nf, - int fileSeq) throws IOException { - if (fileStatus.isDirectory()) { - String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); - if (subPath != null) { - Path finalSubPath = new Path(finalOutputPath, subPath); - if (!fs.exists(finalSubPath)) { - fs.mkdirs(finalSubPath); - } - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); - for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq); - } - } else { - throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); - } - } else { - String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); - if (subPath != null) { - Path finalSubPath = new Path(finalOutputPath, subPath); - finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); - if (!fs.exists(finalSubPath.getParent())) { - fs.mkdirs(finalSubPath.getParent()); - } - if (fs.exists(finalSubPath)) { - throw new IOException("Already exists data file:" + finalSubPath); - } - boolean success = fs.rename(fileStatus.getPath(), finalSubPath); - if (success) { - LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); - } else { - LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); - } - } - } - } - - private String extractSubPath(Path parentPath, Path childPath) { - String parentPathStr = parentPath.toUri().getPath(); - String childPathStr = childPath.toUri().getPath(); - - if (parentPathStr.length() > childPathStr.length()) { - return null; - } - - int index = childPathStr.indexOf(parentPathStr); - if (index != 0) { - return null; - } - - return childPathStr.substring(parentPathStr.length() + 1); - } - private static interface QueryHook { boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir); void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, @@ -947,7 +624,7 @@ public class Query implements EventHandler<QueryEvent> { private void executeNextBlock(Query query) { ExecutionBlockCursor cursor = query.getExecutionBlockCursor(); ExecutionBlock nextBlock = cursor.nextBlock(); - SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm); + SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock); nextSubQuery.setPriority(query.priority--); query.addSubQuery(nextSubQuery); nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT)); http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index f2d8b3a..42fac3a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -41,7 +41,6 @@ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.storage.StorageManager; import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.history.QueryHistory; @@ -71,8 +70,6 @@ public class QueryMaster extends CompositeService implements EventHandler { private GlobalPlanner globalPlanner; - private StorageManager storageManager; - private TajoConf systemConf; private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap(); @@ -116,8 +113,6 @@ public class QueryMaster extends CompositeService implements EventHandler { this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis()); addIfService(dispatcher); - this.storageManager = StorageManager.getStorageManager(systemConf); - globalPlanner = new GlobalPlanner(systemConf, workerContext); dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler()); @@ -373,10 +368,6 @@ public class QueryMaster extends CompositeService implements EventHandler { return clock; } - public StorageManager getStorageManager() { - return storageManager; - } - public QueryMaster getQueryMaster() { return QueryMaster.this; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 75d8ab6..1eaef0f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -35,10 +35,13 @@ import org.apache.tajo.algebra.Expr; import org.apache.tajo.algebra.JsonHelper; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.LogicalRootNode; +import org.apache.tajo.plan.rewrite.RewriteRule; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.plan.logical.LogicalNode; @@ -54,10 +57,12 @@ import org.apache.tajo.master.TajoContainerProxy; import org.apache.tajo.master.event.*; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.master.session.Session; +import org.apache.tajo.plan.verifier.VerifyException; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.StorageProperty; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.metrics.TajoMetrics; @@ -348,6 +353,8 @@ public class QueryMasterTask extends CompositeService { } public synchronized void startQuery() { + StorageManager sm = null; + LogicalPlan plan = null; try { if (query != null) { LOG.warn("Query already started"); @@ -358,7 +365,29 @@ public class QueryMasterTask extends CompositeService { LogicalOptimizer optimizer = new LogicalOptimizer(systemConf); Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class); jsonExpr = null; // remove the possible OOM - LogicalPlan plan = planner.createPlan(queryContext, expr); + plan = planner.createPlan(queryContext, expr); + + StoreType storeType = PlannerUtil.getStoreType(plan); + if (storeType != null) { + sm = StorageManager.getStorageManager(systemConf, storeType); + StorageProperty storageProperty = sm.getStorageProperty(); + if (storageProperty.isSortedInsert()) { + String tableName = PlannerUtil.getStoreTableName(plan); + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); + if (tableDesc == null) { + throw new VerifyException("Can't get table meta data from catalog: " + tableName); + } + List<RewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules( + getQueryTaskContext().getQueryContext(), tableDesc); + if (storageSpecifiedRewriteRules != null) { + for (RewriteRule eachRule: storageSpecifiedRewriteRules) { + optimizer.addRuleAfterToJoinOpt(eachRule); + } + } + } + } + optimizer.optimize(queryContext, plan); GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager(); @@ -393,6 +422,15 @@ public class QueryMasterTask extends CompositeService { } catch (Throwable t) { LOG.error(t.getMessage(), t); initError = t; + + if (plan != null && sm != null) { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + try { + sm.rollbackOutputCommit(rootNode.getChild()); + } catch (IOException e) { + LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e); + } + } } } @@ -441,8 +479,14 @@ public class QueryMasterTask extends CompositeService { // Create Output Directory //////////////////////////////////////////// + String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, ""); if (context.isCreateTable() || context.isInsert()) { - stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId); + if (outputPath == null || outputPath.isEmpty()) { + // hbase + stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); + } else { + stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId); + } } else { stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); } @@ -570,10 +614,6 @@ public class QueryMasterTask extends CompositeService { return queryId; } - public StorageManager getStorageManager() { - return queryMasterContext.getStorageManager(); - } - public Path getStagingDir() { return queryContext.getStagingDir(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java index 0f275e9..75402c2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java @@ -43,6 +43,7 @@ import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttem import org.apache.tajo.plan.logical.*; import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.Pair; import org.apache.tajo.util.TajoIdUtils; @@ -265,8 +266,13 @@ public class QueryUnit implements EventHandler<TaskEvent> { List<String> fragmentList = new ArrayList<String>(); for (FragmentProto eachFragment : getAllFragments()) { - FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment); - fragmentList.add(fileFragment.toString()); + try { + Fragment fragment = FragmentConvertor.convert(systemConf, eachFragment); + fragmentList.add(fragment.toString()); + } catch (Exception e) { + LOG.error(e.getMessage()); + fragmentList.add("ERROR: " + eachFragment.getStoreType() + "," + eachFragment.getId() + ": " + e.getMessage()); + } } queryUnitHistory.setFragments(fragmentList.toArray(new String[]{})); @@ -313,15 +319,18 @@ public class QueryUnit implements EventHandler<TaskEvent> { } } - private void addDataLocation(FileFragment fragment) { + private void addDataLocation(Fragment fragment) { String[] hosts = fragment.getHosts(); - int[] diskIds = fragment.getDiskIds(); + int[] diskIds = null; + if (fragment instanceof FileFragment) { + diskIds = ((FileFragment)fragment).getDiskIds(); + } for (int i = 0; i < hosts.length; i++) { - dataLocations.add(new DataLocation(hosts[i], diskIds[i])); + dataLocations.add(new DataLocation(hosts[i], diskIds == null ? -1 : diskIds[i])); } } - public void addFragment(FileFragment fragment, boolean useDataLocation) { + public void addFragment(Fragment fragment, boolean useDataLocation) { Set<FragmentProto> fragmentProtos; if (fragMap.containsKey(fragment.getTableName())) { fragmentProtos = fragMap.get(fragment.getTableName()); @@ -336,8 +345,8 @@ public class QueryUnit implements EventHandler<TaskEvent> { totalFragmentNum++; } - public void addFragments(Collection<FileFragment> fragments) { - for (FileFragment eachFragment: fragments) { + public void addFragments(Collection<Fragment> fragments) { + for (Fragment eachFragment: fragments) { addFragment(eachFragment, false); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 55b1895..a240ace 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -45,15 +45,18 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAg import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; import org.apache.tajo.master.TaskSchedulerContext; import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; +import org.apache.tajo.plan.logical.SortNode.SortPurpose; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.logical.*; -import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.FileStorageManager; import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.Pair; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.worker.FetchImpl; @@ -83,18 +86,20 @@ public class Repartitioner { MasterPlan masterPlan = subQuery.getMasterPlan(); ExecutionBlock execBlock = subQuery.getBlock(); QueryMasterTask.QueryMasterTaskContext masterContext = subQuery.getContext(); - StorageManager storageManager = subQuery.getStorageManager(); ScanNode[] scans = execBlock.getScanNodes(); Path tablePath; - FileFragment[] fragments = new FileFragment[scans.length]; + Fragment[] fragments = new Fragment[scans.length]; long[] stats = new long[scans.length]; // initialize variables from the child operators for (int i = 0; i < scans.length; i++) { TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName()); if (tableDesc == null) { // if it is a real table stored on storage + FileStorageManager storageManager = + (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()); + tablePath = storageManager.getTablePath(scans[i].getTableName()); if (execBlock.getUnionScanMap() != null && !execBlock.getUnionScanMap().isEmpty()) { for (Map.Entry<ExecutionBlockId, ExecutionBlockId> unionScanEntry: execBlock.getUnionScanMap().entrySet()) { @@ -107,21 +112,23 @@ public class Repartitioner { } fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); } else { - tablePath = new Path(tableDesc.getPath()); try { stats[i] = GlobalPlanner.computeDescendentVolume(scans[i]); } catch (PlanningException e) { throw new IOException(e); } + StorageManager storageManager = + StorageManager.getStorageManager(subQuery.getContext().getConf(), tableDesc.getMeta().getStoreType()); + // if table has no data, storageManager will return empty FileFragment. // So, we need to handle FileFragment by its size. // If we don't check its size, it can cause IndexOutOfBoundsException. - List<FileFragment> fileFragments = storageManager.getSplits(scans[i].getCanonicalName(), tableDesc.getMeta(), tableDesc.getSchema(), tablePath); + List<Fragment> fileFragments = storageManager.getSplits(scans[i].getCanonicalName(), tableDesc); if (fileFragments.size() > 0) { fragments[i] = fileFragments.get(0); } else { - fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); + fragments[i] = new FileFragment(scans[i].getCanonicalName(), new Path(tableDesc.getPath()), 0, 0, new String[]{UNKNOWN_HOST}); } } } @@ -268,14 +275,14 @@ public class Repartitioner { //select intermediate scan and stats ScanNode[] intermediateScans = new ScanNode[largeScanIndexList.size()]; long[] intermediateScanStats = new long[largeScanIndexList.size()]; - FileFragment[] intermediateFragments = new FileFragment[largeScanIndexList.size()]; + Fragment[] intermediateFragments = new Fragment[largeScanIndexList.size()]; int index = 0; for (Integer eachIdx : largeScanIndexList) { intermediateScans[index] = scans[eachIdx]; intermediateScanStats[index] = stats[eachIdx]; intermediateFragments[index++] = fragments[eachIdx]; } - FileFragment[] broadcastFragments = new FileFragment[broadcastIndexList.size()]; + Fragment[] broadcastFragments = new Fragment[broadcastIndexList.size()]; ScanNode[] broadcastScans = new ScanNode[broadcastIndexList.size()]; index = 0; for (Integer eachIdx : broadcastIndexList) { @@ -309,9 +316,9 @@ public class Repartitioner { SubQuery subQuery, ScanNode[] scans, long[] stats, - FileFragment[] fragments, + Fragment[] fragments, ScanNode[] broadcastScans, - FileFragment[] broadcastFragments) throws IOException { + Fragment[] broadcastFragments) throws IOException { MasterPlan masterPlan = subQuery.getMasterPlan(); ExecutionBlock execBlock = subQuery.getBlock(); // The hash map is modeling as follows: @@ -394,7 +401,7 @@ public class Repartitioner { int joinTaskNum = Math.min(maxTaskNum, hashEntries.size()); LOG.info("The determined number of join tasks is " + joinTaskNum); - List<FileFragment> rightFragments = new ArrayList<FileFragment>(); + List<Fragment> rightFragments = new ArrayList<Fragment>(); rightFragments.add(fragments[1]); if (broadcastFragments != null) { @@ -404,14 +411,19 @@ public class Repartitioner { Path[] partitionScanPaths = null; TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName()); if (eachScan.getType() == NodeType.PARTITIONS_SCAN) { + FileStorageManager storageManager = + (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()); + PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan; partitionScanPaths = partitionScan.getInputPaths(); // set null to inputPaths in getFragmentsFromPartitionedTable() - getFragmentsFromPartitionedTable(subQuery.getStorageManager(), eachScan, tableDesc); + getFragmentsFromPartitionedTable(storageManager, eachScan, tableDesc); partitionScan.setInputPaths(partitionScanPaths); } else { - Collection<FileFragment> scanFragments = subQuery.getStorageManager().getSplits(eachScan.getCanonicalName(), - tableDesc.getMeta(), tableDesc.getSchema(), new Path(tableDesc.getPath())); + StorageManager storageManager = StorageManager.getStorageManager(subQuery.getContext().getConf(), + tableDesc.getMeta().getStoreType()); + Collection<Fragment> scanFragments = storageManager.getSplits(eachScan.getCanonicalName(), + tableDesc, eachScan); if (scanFragments != null) { rightFragments.addAll(scanFragments); } @@ -480,10 +492,10 @@ public class Repartitioner { /** * It creates a number of fragments for all partitions. */ - public static List<FileFragment> getFragmentsFromPartitionedTable(StorageManager sm, + public static List<Fragment> getFragmentsFromPartitionedTable(FileStorageManager sm, ScanNode scan, TableDesc table) throws IOException { - List<FileFragment> fragments = Lists.newArrayList(); + List<Fragment> fragments = Lists.newArrayList(); PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan; fragments.addAll(sm.getSplits( scan.getCanonicalName(), table.getMeta(), table.getSchema(), partitionsScan.getInputPaths())); @@ -492,7 +504,7 @@ public class Repartitioner { } private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, SubQuery subQuery, - int baseScanId, FileFragment[] fragments) throws IOException { + int baseScanId, Fragment[] fragments) throws IOException { ExecutionBlock execBlock = subQuery.getBlock(); ScanNode[] scans = execBlock.getScanNodes(); @@ -511,23 +523,27 @@ public class Repartitioner { // . add all partition paths to node's inputPaths variable // -> SCAN // . add all fragments to broadcastFragments - Collection<FileFragment> baseFragments = null; - List<FileFragment> broadcastFragments = new ArrayList<FileFragment>(); + Collection<Fragment> baseFragments = null; + List<Fragment> broadcastFragments = new ArrayList<Fragment>(); for (int i = 0; i < scans.length; i++) { ScanNode scan = scans[i]; TableDesc desc = subQuery.getContext().getTableDescMap().get(scan.getCanonicalName()); TableMeta meta = desc.getMeta(); - Collection<FileFragment> scanFragments; + Collection<Fragment> scanFragments; Path[] partitionScanPaths = null; if (scan.getType() == NodeType.PARTITIONS_SCAN) { PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan; partitionScanPaths = partitionScan.getInputPaths(); // set null to inputPaths in getFragmentsFromPartitionedTable() - scanFragments = getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, desc); + FileStorageManager storageManager = + (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()); + scanFragments = getFragmentsFromPartitionedTable(storageManager, scan, desc); } else { - scanFragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(), - new Path(desc.getPath())); + StorageManager storageManager = + StorageManager.getStorageManager(subQuery.getContext().getConf(), desc.getMeta().getStoreType()); + + scanFragments = storageManager.getSplits(scan.getCanonicalName(), desc, scan); } if (scanFragments != null) { @@ -630,46 +646,66 @@ public class Repartitioner { ExecutionBlock execBlock = subQuery.getBlock(); ScanNode scan = execBlock.getScanNodes()[0]; Path tablePath; - tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName()); + tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf())) + .getTablePath(scan.getTableName()); ExecutionBlock sampleChildBlock = masterPlan.getChild(subQuery.getId(), 0); SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(), NodeType.SORT); SortSpec [] sortSpecs = sortNode.getSortKeys(); Schema sortSchema = new Schema(channel.getShuffleKeys()); + TupleRange[] ranges; + int determinedTaskNum; + // calculate the number of maximum query ranges TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId()); // If there is an empty table in inner join, it should return zero rows. - if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0 ) { + if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0) { return; } TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), false); - RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs); - BigInteger card = partitioner.getTotalCardinality(); - // if the number of the range cardinality is less than the desired number of tasks, - // we set the the number of tasks to the number of range cardinality. - int determinedTaskNum; - if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) { - LOG.info(subQuery.getId() + ", The range cardinality (" + card - + ") is less then the desired number of tasks (" + maxNum + ")"); - determinedTaskNum = card.intValue(); + if (sortNode.getSortPurpose() == SortPurpose.STORAGE_SPECIFIED) { + StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan()); + CatalogService catalog = subQuery.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + LogicalRootNode rootNode = masterPlan.getLogicalPlan().getRootBlock().getRoot(); + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); + if (tableDesc == null) { + throw new IOException("Can't get table meta data from catalog: " + + PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan())); + } + ranges = StorageManager.getStorageManager(subQuery.getContext().getConf(), storeType) + .getInsertSortRanges(subQuery.getContext().getQueryContext(), tableDesc, + sortNode.getInSchema(), sortSpecs, + mergedRange); + determinedTaskNum = ranges.length; } else { - determinedTaskNum = maxNum; - } + RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs); + BigInteger card = partitioner.getTotalCardinality(); + + // if the number of the range cardinality is less than the desired number of tasks, + // we set the the number of tasks to the number of range cardinality. + if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) { + LOG.info(subQuery.getId() + ", The range cardinality (" + card + + ") is less then the desired number of tasks (" + maxNum + ")"); + determinedTaskNum = card.intValue(); + } else { + determinedTaskNum = maxNum; + } - LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum + - " sub ranges (total units: " + determinedTaskNum + ")"); - TupleRange [] ranges = partitioner.partition(determinedTaskNum); - if (ranges == null || ranges.length == 0) { - LOG.warn(subQuery.getId() + " no range infos."); - } - TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges); - if (LOG.isDebugEnabled()) { - if (ranges != null) { - for (TupleRange eachRange : ranges) { - LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); + LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum + + " sub ranges (total units: " + determinedTaskNum + ")"); + ranges = partitioner.partition(determinedTaskNum); + if (ranges == null || ranges.length == 0) { + LOG.warn(subQuery.getId() + " no range infos."); + } + TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges); + if (LOG.isDebugEnabled()) { + if (ranges != null) { + for (TupleRange eachRange : ranges) { + LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); + } } } } @@ -772,14 +808,15 @@ public class Repartitioner { public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan, SubQuery subQuery, DataChannel channel, - int maxNum) { + int maxNum) throws IOException { ExecutionBlock execBlock = subQuery.getBlock(); ScanNode scan = execBlock.getScanNodes()[0]; Path tablePath; - tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName()); + tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf())) + .getTablePath(scan.getTableName()); - FileFragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); - List<FileFragment> fragments = new ArrayList<FileFragment>(); + Fragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); + List<Fragment> fragments = new ArrayList<Fragment>(); fragments.add(frag); SubQuery.scheduleFragments(subQuery, fragments); http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 39bb7ed..7f05fa4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -36,7 +36,7 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; @@ -59,10 +59,11 @@ import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttem import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; import org.apache.tajo.master.container.TajoContainer; import org.apache.tajo.master.container.TajoContainerId; +import org.apache.tajo.storage.FileStorageManager; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.logical.*; import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.history.QueryUnitHistory; @@ -96,7 +97,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> { private TableStats resultStatistics; private TableStats inputStatistics; private EventHandler<Event> eventHandler; - private final StorageManager sm; private AbstractTaskScheduler taskScheduler; private QueryMasterTask.QueryMasterTaskContext context; private final List<String> diagnostics = new ArrayList<String>(); @@ -286,12 +286,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> { private AtomicInteger completeReportReceived = new AtomicInteger(0); private SubQueryHistory finalSubQueryHistory; - public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, - ExecutionBlock block, StorageManager sm) { + public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) { this.context = context; this.masterPlan = masterPlan; this.block = block; - this.sm = sm; this.eventHandler = context.getEventHandler(); ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @@ -509,10 +507,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> { return this.priority; } - public StorageManager getStorageManager() { - return sm; - } - public ExecutionBlockId getId() { return block.getId(); } @@ -677,14 +671,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0); - // get default or store type - CatalogProtos.StoreType storeType = CatalogProtos.StoreType.CSV; // default setting // if store plan (i.e., CREATE or INSERT OVERWRITE) - StoreTableNode storeTableNode = PlannerUtil.findTopNode(getBlock().getPlan(), NodeType.STORE); - if (storeTableNode != null) { - storeType = storeTableNode.getStorageType(); + StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan()); + if (storeType == null) { + // get default or store type + storeType = StoreType.CSV; } + schema = channel.getSchema(); meta = CatalogUtil.newTableMeta(storeType, new KeyValueSet()); inputStatistics = statsArray[0]; @@ -1043,7 +1037,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { ScanNode scan = scans[0]; TableDesc table = subQuery.context.getTableDescMap().get(scan.getCanonicalName()); - Collection<FileFragment> fragments; + Collection<Fragment> fragments; TableMeta meta = table.getMeta(); // Depending on scanner node's type, it creates fragments. If scan is for @@ -1052,10 +1046,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> { // span a number of blocks or possibly consists of a number of files. if (scan.getType() == NodeType.PARTITIONS_SCAN) { // After calling this method, partition paths are removed from the physical plan. - fragments = Repartitioner.getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, table); + FileStorageManager storageManager = + (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()); + fragments = Repartitioner.getFragmentsFromPartitionedTable(storageManager, scan, table); } else { - Path inputPath = new Path(table.getPath()); - fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, table.getSchema(), inputPath); + StorageManager storageManager = + StorageManager.getStorageManager(subQuery.getContext().getConf(), meta.getStoreType()); + fragments = storageManager.getSplits(scan.getCanonicalName(), table, scan); } SubQuery.scheduleFragments(subQuery, fragments); @@ -1073,27 +1070,27 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } } - public static void scheduleFragment(SubQuery subQuery, FileFragment fragment) { + public static void scheduleFragment(SubQuery subQuery, Fragment fragment) { subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, subQuery.getId(), fragment)); } - public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> fragments) { - for (FileFragment eachFragment : fragments) { + public static void scheduleFragments(SubQuery subQuery, Collection<Fragment> fragments) { + for (Fragment eachFragment : fragments) { scheduleFragment(subQuery, eachFragment); } } - public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> leftFragments, - Collection<FileFragment> broadcastFragments) { - for (FileFragment eachLeafFragment : leftFragments) { + public static void scheduleFragments(SubQuery subQuery, Collection<Fragment> leftFragments, + Collection<Fragment> broadcastFragments) { + for (Fragment eachLeafFragment : leftFragments) { scheduleFragment(subQuery, eachLeafFragment, broadcastFragments); } } public static void scheduleFragment(SubQuery subQuery, - FileFragment leftFragment, Collection<FileFragment> rightFragments) { + Fragment leftFragment, Collection<Fragment> rightFragments) { subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, subQuery.getId(), leftFragment, rightFragments)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java index 0cc87fc..f1a9224 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java @@ -39,6 +39,7 @@ import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.querymaster.QueryInProgress; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.util.ApplicationIdUtils; +import org.apache.tajo.util.StringUtils; import java.io.IOException; import java.util.*; http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java index 0de1b2b..3147bb6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java @@ -38,7 +38,7 @@ public class IndexUtil { public static String getIndexNameOfFrag(FileFragment fragment, SortSpec[] keys) { StringBuilder builder = new StringBuilder(); builder.append(fragment.getPath().getName() + "_"); - builder.append(fragment.getStartKey() + "_" + fragment.getEndKey() + "_"); + builder.append(fragment.getStartKey() + "_" + fragment.getLength() + "_"); for(int i = 0 ; i < keys.length ; i ++) { builder.append(keys[i].getSortKey().getSimpleName()+"_"); }
