TAJO-1288: Refactoring org.apache.tajo.master package. Closes #338
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1c29c1cb Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1c29c1cb Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1c29c1cb Branch: refs/heads/master Commit: 1c29c1cb4bd0e2d75954575717cb5cf05875fe51 Parents: a1e0328 Author: Hyunsik Choi <[email protected]> Authored: Fri Jan 9 00:31:54 2015 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Fri Jan 9 00:31:54 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../apache/tajo/engine/query/QueryContext.java | 2 +- .../main/java/org/apache/tajo/ha/HAService.java | 56 + .../org/apache/tajo/ha/HAServiceHDFSImpl.java | 316 +++++ .../java/org/apache/tajo/ha/TajoMasterInfo.java | 89 ++ .../tajo/master/AbstractTaskScheduler.java | 56 - .../org/apache/tajo/master/ContainerProxy.java | 2 +- .../tajo/master/DefaultTaskScheduler.java | 928 ------------ .../apache/tajo/master/FetchScheduleEvent.java | 40 - .../org/apache/tajo/master/FragmentPair.java | 73 - .../org/apache/tajo/master/GlobalEngine.java | 2 +- .../NonForwardQueryResultFileScanner.java | 164 --- .../master/NonForwardQueryResultScanner.java | 46 - .../NonForwardQueryResultSystemScanner.java | 616 -------- .../java/org/apache/tajo/master/QueryInfo.java | 235 +++ .../org/apache/tajo/master/QueryJobManager.java | 311 ++++ .../apache/tajo/master/ScheduledFetches.java | 49 - .../apache/tajo/master/TajoContainerProxy.java | 2 +- .../java/org/apache/tajo/master/TajoMaster.java | 11 +- .../tajo/master/TajoMasterClientService.java | 13 +- .../apache/tajo/master/TajoMasterService.java | 2 - .../tajo/master/TaskSchedulerContext.java | 65 - .../tajo/master/TaskSchedulerFactory.java | 69 - .../tajo/master/event/QueryCompletedEvent.java | 2 +- .../tajo/master/event/QueryStartEvent.java | 2 +- .../tajo/master/event/StageCompletedEvent.java | 2 +- .../event/TaskAttemptToSchedulerEvent.java | 2 +- .../apache/tajo/master/exec/DDLExecutor.java | 1 - .../exec/NonForwardQueryResultFileScanner.java | 164 +++ .../exec/NonForwardQueryResultScanner.java | 46 + .../NonForwardQueryResultSystemScanner.java | 616 ++++++++ .../apache/tajo/master/exec/QueryExecutor.java | 9 +- .../org/apache/tajo/master/ha/HAService.java | 56 - .../tajo/master/ha/HAServiceHDFSImpl.java | 318 ----- .../apache/tajo/master/ha/TajoMasterInfo.java | 89 -- .../master/metrics/CatalogMetricsGaugeSet.java | 56 - .../metrics/WorkerResourceMetricsGaugeSet.java | 74 - .../apache/tajo/master/querymaster/Query.java | 738 ---------- .../master/querymaster/QueryInProgress.java | 300 ---- .../tajo/master/querymaster/QueryInfo.java | 235 --- .../tajo/master/querymaster/QueryJobEvent.java | 45 - .../master/querymaster/QueryJobManager.java | 310 ---- .../tajo/master/querymaster/QueryMaster.java | 631 -------- .../querymaster/QueryMasterManagerService.java | 263 ---- .../master/querymaster/QueryMasterRunner.java | 149 -- .../master/querymaster/QueryMasterTask.java | 638 --------- .../tajo/master/querymaster/Repartitioner.java | 1251 ---------------- .../apache/tajo/master/querymaster/Stage.java | 1342 ------------------ .../tajo/master/querymaster/StageState.java | 30 - .../apache/tajo/master/querymaster/Task.java | 907 ------------ .../tajo/master/querymaster/TaskAttempt.java | 443 ------ .../master/rm/TajoWorkerResourceManager.java | 3 +- .../tajo/master/rm/WorkerResourceManager.java | 2 +- .../master/scheduler/QuerySchedulingInfo.java | 55 + .../apache/tajo/master/scheduler/Scheduler.java | 41 + .../master/scheduler/SchedulingAlgorithms.java | 47 + .../master/scheduler/SimpleFifoScheduler.java | 147 ++ .../master/session/InvalidSessionException.java | 25 - .../session/NoSuchSessionVariableException.java | 25 - .../org/apache/tajo/master/session/Session.java | 196 --- .../tajo/master/session/SessionConstants.java | 23 - .../tajo/master/session/SessionEvent.java | 34 - .../tajo/master/session/SessionEventType.java | 24 - .../session/SessionLivelinessMonitor.java | 53 - .../tajo/master/session/SessionManager.java | 144 -- .../tajo/metrics/CatalogMetricsGaugeSet.java | 56 + .../metrics/WorkerResourceMetricsGaugeSet.java | 74 + .../tajo/querymaster/AbstractTaskScheduler.java | 56 + .../tajo/querymaster/DefaultTaskScheduler.java | 926 ++++++++++++ .../tajo/querymaster/FetchScheduleEvent.java | 40 + .../java/org/apache/tajo/querymaster/Query.java | 738 ++++++++++ .../tajo/querymaster/QueryInProgress.java | 301 ++++ .../apache/tajo/querymaster/QueryJobEvent.java | 46 + .../apache/tajo/querymaster/QueryMaster.java | 631 ++++++++ .../querymaster/QueryMasterManagerService.java | 262 ++++ .../tajo/querymaster/QueryMasterTask.java | 638 +++++++++ .../apache/tajo/querymaster/Repartitioner.java | 1250 ++++++++++++++++ .../java/org/apache/tajo/querymaster/Stage.java | 1342 ++++++++++++++++++ .../org/apache/tajo/querymaster/StageState.java | 30 + .../java/org/apache/tajo/querymaster/Task.java | 897 ++++++++++++ .../apache/tajo/querymaster/TaskAttempt.java | 443 ++++++ .../tajo/querymaster/TaskSchedulerContext.java | 65 + .../tajo/querymaster/TaskSchedulerFactory.java | 68 + .../tajo/scheduler/QuerySchedulingInfo.java | 55 - .../org/apache/tajo/scheduler/Scheduler.java | 41 - .../tajo/scheduler/SchedulingAlgorithms.java | 47 - .../tajo/scheduler/SimpleFifoScheduler.java | 147 -- .../tajo/session/InvalidSessionException.java | 25 + .../session/NoSuchSessionVariableException.java | 25 + .../java/org/apache/tajo/session/Session.java | 196 +++ .../apache/tajo/session/SessionConstants.java | 23 + .../org/apache/tajo/session/SessionEvent.java | 34 + .../apache/tajo/session/SessionEventType.java | 24 + .../tajo/session/SessionLivelinessMonitor.java | 53 + .../org/apache/tajo/session/SessionManager.java | 144 ++ .../main/java/org/apache/tajo/util/JSPUtil.java | 10 +- .../apache/tajo/util/history/HistoryReader.java | 2 +- .../apache/tajo/util/history/HistoryWriter.java | 2 +- .../java/org/apache/tajo/worker/FetchImpl.java | 4 +- .../tajo/worker/TajoResourceAllocator.java | 6 +- .../java/org/apache/tajo/worker/TajoWorker.java | 6 +- .../tajo/worker/TajoWorkerClientService.java | 2 +- tajo-core/src/main/resources/tajo-default.xml | 2 +- .../resources/webapps/admin/catalogview.jsp | 2 +- .../main/resources/webapps/admin/cluster.jsp | 4 +- .../src/main/resources/webapps/admin/index.jsp | 6 +- .../src/main/resources/webapps/admin/query.jsp | 4 +- .../resources/webapps/admin/query_executor.jsp | 2 +- .../src/main/resources/webapps/worker/index.jsp | 4 +- .../resources/webapps/worker/querydetail.jsp | 4 +- .../main/resources/webapps/worker/queryplan.jsp | 6 +- .../resources/webapps/worker/querytasks.jsp | 2 +- .../src/main/resources/webapps/worker/task.jsp | 8 +- .../apache/tajo/LocalTajoTestingUtility.java | 2 +- .../org/apache/tajo/TajoTestingCluster.java | 8 +- .../tajo/engine/planner/TestLogicalPlanner.java | 2 +- .../planner/physical/TestPhysicalPlanner.java | 2 +- .../tajo/engine/query/TestGroupByQuery.java | 8 +- .../tajo/engine/query/TestJoinBroadcast.java | 2 +- .../tajo/engine/query/TestTablePartitions.java | 2 +- .../apache/tajo/ha/TestHAServiceHDFSImpl.java | 153 ++ .../TestNonForwardQueryResultSystemScanner.java | 4 +- .../apache/tajo/master/TestRepartitioner.java | 8 +- .../tajo/master/ha/TestHAServiceHDFSImpl.java | 158 --- .../querymaster/TestIntermediateEntry.java | 53 - .../tajo/master/querymaster/TestKillQuery.java | 125 -- .../master/querymaster/TestQueryProgress.java | 75 - .../querymaster/TestTaskStatusUpdate.java | 194 --- .../master/scheduler/TestFifoScheduler.java | 116 ++ .../tajo/querymaster/TestIntermediateEntry.java | 53 + .../apache/tajo/querymaster/TestKillQuery.java | 125 ++ .../tajo/querymaster/TestQueryProgress.java | 75 + .../tajo/querymaster/TestTaskStatusUpdate.java | 194 +++ .../tajo/scheduler/TestFifoScheduler.java | 116 -- .../java/org/apache/tajo/util/TestJSPUtil.java | 2 +- .../util/history/TestHistoryWriterReader.java | 2 +- .../org/apache/tajo/worker/TestHistory.java | 2 +- tajo-dist/pom.xml | 8 +- 138 files changed, 11317 insertions(+), 11612 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 7783db8..96b63ea 100644 --- a/CHANGES +++ b/CHANGES @@ -27,6 +27,8 @@ Release 0.9.1 - unreleased IMPROVEMENT + TAJO-1288: Refactoring org.apache.tajo.master package. (hyunsik) + TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt stop events. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java index 493ca6e..7b3c00d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java @@ -25,7 +25,7 @@ import org.apache.tajo.QueryVars; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.master.session.Session; +import org.apache.tajo.session.Session; import org.apache.tajo.plan.logical.NodeType; import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java new file mode 100644 index 0000000..1329223 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ha; + +import java.io.IOException; +import java.util.List; + +/** + * The HAService is responsible for setting active TajoMaster on startup or when the + * current active is changing (eg due to failure), monitoring the health of TajoMaster. + * + */ +public interface HAService { + + /** + * Add master name to shared storage. + */ + public void register() throws IOException; + + + /** + * Delete master name to shared storage. + * + */ + public void delete() throws IOException; + + /** + * + * @return True if current master is an active master. + */ + public boolean isActiveStatus(); + + /** + * + * @return return all master list + * @throws IOException + */ + public List<TajoMasterInfo> getMasters() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java new file mode 100644 index 0000000..e18a9b2 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ha; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.net.NetUtils; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.util.TUtil; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; + +/** + * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster. + * + */ +public class HAServiceHDFSImpl implements HAService { + private static Log LOG = LogFactory.getLog(HAServiceHDFSImpl.class); + + private MasterContext context; + private TajoConf conf; + + private FileSystem fs; + + private String masterName; + private Path rootPath; + private Path haPath; + private Path activePath; + private Path backupPath; + + private boolean isActiveStatus = false; + + //thread which runs periodically to see the last time since a heartbeat is received. + private Thread checkerThread; + private volatile boolean stopped = false; + + private int monitorInterval; + + private String currentActiveMaster; + + public HAServiceHDFSImpl(MasterContext context) throws IOException { + this.context = context; + this.conf = context.getConf(); + initSystemDirectory(); + + InetSocketAddress socketAddress = context.getTajoMasterService().getBindAddress(); + this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort(); + + monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL); + } + + private void initSystemDirectory() throws IOException { + // Get Tajo root dir + this.rootPath = TajoConf.getTajoRootDir(conf); + + // Check Tajo root dir + this.fs = rootPath.getFileSystem(conf); + + // Check and create Tajo system HA dir + haPath = TajoConf.getSystemHADir(conf); + if (!fs.exists(haPath)) { + fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); + LOG.info("System HA dir '" + haPath + "' is created"); + } + + activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + if (!fs.exists(activePath)) { + fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); + LOG.info("System HA Active dir '" + activePath + "' is created"); + } + + backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); + if (!fs.exists(backupPath)) { + fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); + LOG.info("System HA Backup dir '" + backupPath + "' is created"); + } + } + + private void startPingChecker() { + if (checkerThread == null) { + checkerThread = new Thread(new PingChecker()); + checkerThread.setName("Ping Checker"); + checkerThread.start(); + } + } + + @Override + public void register() throws IOException { + FileStatus[] files = fs.listStatus(activePath); + + // Phase 1: If there is not another active master, this try to become active master. + if (files.length == 0) { + createMasterFile(true); + currentActiveMaster = masterName; + LOG.info(String.format("This is added to active master (%s)", masterName)); + } else { + // Phase 2: If there is active master information, we need to check its status. + Path activePath = files[0].getPath(); + currentActiveMaster = activePath.getName().replaceAll("_", ":"); + + // Phase 3: If current active master is dead, this master should be active master. + if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) { + fs.delete(activePath, true); + createMasterFile(true); + currentActiveMaster = masterName; + LOG.info(String.format("This is added to active master (%s)", masterName)); + } else { + // Phase 4: If current active master is alive, this master need to be backup master. + createMasterFile(false); + LOG.info(String.format("This is added to backup masters (%s)", masterName)); + } + } + } + + private void createMasterFile(boolean isActive) throws IOException { + String fileName = masterName.replaceAll(":", "_"); + Path path = null; + + if (isActive) { + path = new Path(activePath, fileName); + } else { + path = new Path(backupPath, fileName); + } + + StringBuilder sb = new StringBuilder(); + InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + + address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + + address = getHostAddress(HAConstants.CATALOG_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + + address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()); + + FSDataOutputStream out = fs.create(path); + + try { + out.writeUTF(sb.toString()); + out.hflush(); + out.close(); + } catch (FileAlreadyExistsException e) { + createMasterFile(false); + } + + if (isActive) { + isActiveStatus = true; + } else { + isActiveStatus = false; + } + + startPingChecker(); + } + + + private InetSocketAddress getHostAddress(int type) { + InetSocketAddress address = null; + + switch (type) { + case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS: + address = context.getConf().getSocketAddrVar(TajoConf.ConfVars + .TAJO_MASTER_UMBILICAL_RPC_ADDRESS); + break; + case HAConstants.MASTER_CLIENT_RPC_ADDRESS: + address = context.getConf().getSocketAddrVar(TajoConf.ConfVars + .TAJO_MASTER_CLIENT_RPC_ADDRESS); + break; + case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS: + address = context.getConf().getSocketAddrVar(TajoConf.ConfVars + .RESOURCE_TRACKER_RPC_ADDRESS); + break; + case HAConstants.CATALOG_ADDRESS: + address = context.getConf().getSocketAddrVar(TajoConf.ConfVars + .CATALOG_ADDRESS); + break; + case HAConstants.MASTER_INFO_ADDRESS: + address = context.getConf().getSocketAddrVar(TajoConf.ConfVars + .TAJO_MASTER_INFO_ADDRESS); + default: + break; + } + + return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort()); + } + + @Override + public void delete() throws IOException { + String fileName = masterName.replaceAll(":", "_"); + + Path activeFile = new Path(activePath, fileName); + if (fs.exists(activeFile)) { + fs.delete(activeFile, true); + } + + Path backupFile = new Path(backupPath, fileName); + if (fs.exists(backupFile)) { + fs.delete(backupFile, true); + } + if (isActiveStatus) { + isActiveStatus = false; + } + stopped = true; + } + + @Override + public boolean isActiveStatus() { + return isActiveStatus; + } + + @Override + public List<TajoMasterInfo> getMasters() throws IOException { + List<TajoMasterInfo> list = TUtil.newList(); + Path path = null; + + FileStatus[] files = fs.listStatus(activePath); + if (files.length == 1) { + path = files[0].getPath(); + list.add(createTajoMasterInfo(path, true)); + } + + files = fs.listStatus(backupPath); + for (FileStatus status : files) { + path = status.getPath(); + list.add(createTajoMasterInfo(path, false)); + } + + return list; + } + + private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException { + String masterAddress = path.getName().replaceAll("_", ":"); + boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf); + + FSDataInputStream stream = fs.open(path); + String data = stream.readUTF(); + + stream.close(); + + String[] addresses = data.split("_"); + TajoMasterInfo info = new TajoMasterInfo(); + + info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress)); + info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0])); + info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1])); + info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2])); + info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3])); + + info.setAvailable(isAlive); + info.setActive(isActive); + + return info; + } + + private class PingChecker implements Runnable { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + synchronized (HAServiceHDFSImpl.this) { + try { + if (!currentActiveMaster.equals(masterName)) { + boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf); + if (LOG.isDebugEnabled()) { + LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName + + ", isAlive:" + isAlive); + } + + // If active master is dead, this master should be active master instead of + // previous active master. + if (!isAlive) { + FileStatus[] files = fs.listStatus(activePath); + if (files.length == 0 || (files.length == 1 + && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) { + delete(); + register(); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + try { + Thread.sleep(monitorInterval); + } catch (InterruptedException e) { + LOG.info("PingChecker interrupted. - masterName:" + masterName); + break; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java b/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java new file mode 100644 index 0000000..c6fdd40 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ha; + +import java.net.InetSocketAddress; + +public class TajoMasterInfo { + + private boolean available; + private boolean isActive; + + private InetSocketAddress tajoMasterAddress; + private InetSocketAddress tajoClientAddress; + private InetSocketAddress workerResourceTrackerAddr; + private InetSocketAddress catalogAddress; + private InetSocketAddress webServerAddress; + + public InetSocketAddress getTajoMasterAddress() { + return tajoMasterAddress; + } + + public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) { + this.tajoMasterAddress = tajoMasterAddress; + } + + public InetSocketAddress getTajoClientAddress() { + return tajoClientAddress; + } + + public void setTajoClientAddress(InetSocketAddress tajoClientAddress) { + this.tajoClientAddress = tajoClientAddress; + } + + public InetSocketAddress getWorkerResourceTrackerAddr() { + return workerResourceTrackerAddr; + } + + public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) { + this.workerResourceTrackerAddr = workerResourceTrackerAddr; + } + + public InetSocketAddress getCatalogAddress() { + return catalogAddress; + } + + public void setCatalogAddress(InetSocketAddress catalogAddress) { + this.catalogAddress = catalogAddress; + } + + public InetSocketAddress getWebServerAddress() { + return webServerAddress; + } + + public void setWebServerAddress(InetSocketAddress webServerAddress) { + this.webServerAddress = webServerAddress; + } + + public boolean isAvailable() { + return available; + } + + public void setAvailable(boolean available) { + this.available = available; + } + + public boolean isActive() { + return isActive; + } + + public void setActive(boolean active) { + isActive = active; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java deleted file mode 100644 index 320a5aa..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tajo.master.event.TaskRequestEvent; -import org.apache.tajo.master.event.TaskSchedulerEvent; - - -public abstract class AbstractTaskScheduler extends AbstractService implements EventHandler<TaskSchedulerEvent> { - - protected int hostLocalAssigned; - protected int rackLocalAssigned; - protected int totalAssigned; - - /** - * Construct the service. - * - * @param name service name - */ - public AbstractTaskScheduler(String name) { - super(name); - } - - public int getHostLocalAssigned() { - return hostLocalAssigned; - } - - public int getRackLocalAssigned() { - return rackLocalAssigned; - } - - public int getTotalAssigned() { - return totalAssigned; - } - - public abstract void handleTaskRequestEvent(TaskRequestEvent event); - public abstract int remainingScheduledObjectNum(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java index 462de91..562790d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.master.querymaster.QueryMasterTask; +import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.master.container.TajoContainer; import org.apache.tajo.master.container.TajoContainerId; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java deleted file mode 100644 index d47c93a..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ /dev/null @@ -1,928 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.util.RackResolver; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.engine.planner.global.ExecutionBlock; -import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.engine.query.TaskRequest; -import org.apache.tajo.engine.query.TaskRequestImpl; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.container.TajoContainerId; -import org.apache.tajo.master.event.*; -import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; -import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; -import org.apache.tajo.master.querymaster.Stage; -import org.apache.tajo.master.querymaster.Task; -import org.apache.tajo.master.querymaster.TaskAttempt; -import org.apache.tajo.plan.serder.LogicalNodeSerializer; -import org.apache.tajo.plan.serder.PlanProto; -import org.apache.tajo.storage.DataLocation; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.NetUtils; -import org.apache.tajo.worker.FetchImpl; - -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; - -public class DefaultTaskScheduler extends AbstractTaskScheduler { - private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class); - - private final TaskSchedulerContext context; - private Stage stage; - - private Thread schedulingThread; - private AtomicBoolean stopEventHandling = new AtomicBoolean(false); - - private ScheduledRequests scheduledRequests; - private TaskRequests taskRequests; - - private int nextTaskId = 0; - private int scheduledObjectNum = 0; - - public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) { - super(DefaultTaskScheduler.class.getName()); - this.context = context; - this.stage = stage; - } - - @Override - public void init(Configuration conf) { - - scheduledRequests = new ScheduledRequests(); - taskRequests = new TaskRequests(); - - super.init(conf); - } - - @Override - public void start() { - LOG.info("Start TaskScheduler"); - - this.schedulingThread = new Thread() { - public void run() { - - while(!stopEventHandling.get() && !Thread.currentThread().isInterrupted()) { - try { - synchronized (schedulingThread){ - schedulingThread.wait(100); - } - schedule(); - } catch (InterruptedException e) { - break; - } catch (Throwable e) { - LOG.fatal(e.getMessage(), e); - break; - } - } - LOG.info("TaskScheduler schedulingThread stopped"); - } - }; - - this.schedulingThread.start(); - super.start(); - } - - private static final TaskAttemptId NULL_ATTEMPT_ID; - public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq; - static { - ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); - NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0); - - TajoWorkerProtocol.TaskRequestProto.Builder builder = - TajoWorkerProtocol.TaskRequestProto.newBuilder(); - builder.setId(NULL_ATTEMPT_ID.getProto()); - builder.setShouldDie(true); - builder.setOutputTable(""); - builder.setPlan(PlanProto.LogicalNodeTree.newBuilder()); - builder.setClusteredOutput(false); - stopTaskRunnerReq = builder.build(); - } - - @Override - public void stop() { - if(stopEventHandling.getAndSet(true)){ - return; - } - - if (schedulingThread != null) { - synchronized (schedulingThread) { - schedulingThread.notifyAll(); - } - } - - // Return all of request callbacks instantly. - if(taskRequests != null){ - for (TaskRequestEvent req : taskRequests.taskRequestQueue) { - req.getCallback().run(stopTaskRunnerReq); - } - } - - LOG.info("Task Scheduler stopped"); - super.stop(); - } - - private Fragment[] fragmentsForNonLeafTask; - private Fragment[] broadcastFragmentsForNonLeafTask; - - LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>(); - public void schedule() { - - if (taskRequests.size() > 0) { - if (scheduledRequests.leafTaskNum() > 0) { - LOG.debug("Try to schedule tasks with taskRequestEvents: " + - taskRequests.size() + ", LeafTask Schedule Request: " + - scheduledRequests.leafTaskNum()); - taskRequests.getTaskRequests(taskRequestEvents, - scheduledRequests.leafTaskNum()); - LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents "); - if (taskRequestEvents.size() > 0) { - scheduledRequests.assignToLeafTasks(taskRequestEvents); - taskRequestEvents.clear(); - } - } - } - - if (taskRequests.size() > 0) { - if (scheduledRequests.nonLeafTaskNum() > 0) { - LOG.debug("Try to schedule tasks with taskRequestEvents: " + - taskRequests.size() + ", NonLeafTask Schedule Request: " + - scheduledRequests.nonLeafTaskNum()); - taskRequests.getTaskRequests(taskRequestEvents, - scheduledRequests.nonLeafTaskNum()); - scheduledRequests.assignToNonLeafTasks(taskRequestEvents); - taskRequestEvents.clear(); - } - } - } - - @Override - public void handle(TaskSchedulerEvent event) { - if (event.getType() == EventType.T_SCHEDULE) { - if (event instanceof FragmentScheduleEvent) { - FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event; - if (context.isLeafQuery()) { - TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(); - Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++); - task.addFragment(castEvent.getLeftFragment(), true); - scheduledObjectNum++; - if (castEvent.hasRightFragments()) { - task.addFragments(castEvent.getRightFragments()); - } - stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); - } else { - fragmentsForNonLeafTask = new FileFragment[2]; - fragmentsForNonLeafTask[0] = castEvent.getLeftFragment(); - if (castEvent.hasRightFragments()) { - FileFragment[] rightFragments = castEvent.getRightFragments().toArray(new FileFragment[]{}); - fragmentsForNonLeafTask[1] = rightFragments[0]; - if (rightFragments.length > 1) { - broadcastFragmentsForNonLeafTask = new FileFragment[rightFragments.length - 1]; - System.arraycopy(rightFragments, 1, broadcastFragmentsForNonLeafTask, 0, broadcastFragmentsForNonLeafTask.length); - } else { - broadcastFragmentsForNonLeafTask = null; - } - } - } - } else if (event instanceof FetchScheduleEvent) { - FetchScheduleEvent castEvent = (FetchScheduleEvent) event; - Map<String, List<FetchImpl>> fetches = castEvent.getFetches(); - TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(); - Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++); - scheduledObjectNum++; - for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) { - task.addFetches(eachFetch.getKey(), eachFetch.getValue()); - task.addFragment(fragmentsForNonLeafTask[0], true); - if (fragmentsForNonLeafTask[1] != null) { - task.addFragment(fragmentsForNonLeafTask[1], true); - } - } - if (broadcastFragmentsForNonLeafTask != null && broadcastFragmentsForNonLeafTask.length > 0) { - task.addFragments(Arrays.asList(broadcastFragmentsForNonLeafTask)); - } - stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); - } else if (event instanceof TaskAttemptToSchedulerEvent) { - TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event; - if (context.isLeafQuery()) { - scheduledRequests.addLeafTask(castEvent); - } else { - scheduledRequests.addNonLeafTask(castEvent); - } - } - } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) { - // when a stage is killed, unassigned query unit attmpts are canceled from the scheduler. - // This event is triggered by TaskAttempt. - TaskAttemptToSchedulerEvent castedEvent = (TaskAttemptToSchedulerEvent) event; - scheduledRequests.leafTasks.remove(castedEvent.getTaskAttempt().getId()); - LOG.info(castedEvent.getTaskAttempt().getId() + " is canceled from " + this.getClass().getSimpleName()); - ((TaskAttemptToSchedulerEvent) event).getTaskAttempt().handle( - new TaskAttemptEvent(castedEvent.getTaskAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED)); - } - } - - @Override - public void handleTaskRequestEvent(TaskRequestEvent event) { - - taskRequests.handle(event); - int hosts = scheduledRequests.leafTaskHostMapping.size(); - - // if available cluster resource are large then tasks, the scheduler thread are working immediately. - if(remainingScheduledObjectNum() > 0 && - (remainingScheduledObjectNum() <= hosts || hosts <= taskRequests.size())){ - synchronized (schedulingThread){ - schedulingThread.notifyAll(); - } - } - } - - @Override - public int remainingScheduledObjectNum() { - return scheduledObjectNum; - } - - private class TaskRequests implements EventHandler<TaskRequestEvent> { - private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue = - new LinkedBlockingQueue<TaskRequestEvent>(); - - @Override - public void handle(TaskRequestEvent event) { - if(LOG.isDebugEnabled()){ - LOG.debug("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId()); - } - - if(stopEventHandling.get()) { - event.getCallback().run(stopTaskRunnerReq); - return; - } - int qSize = taskRequestQueue.size(); - if (qSize != 0 && qSize % 1000 == 0) { - LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize); - } - int remCapacity = taskRequestQueue.remainingCapacity(); - if (remCapacity < 1000) { - LOG.warn("Very low remaining capacity in the event-queue " - + "of DefaultTaskScheduler: " + remCapacity); - } - - taskRequestQueue.add(event); - } - - public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests, - int num) { - taskRequestQueue.drainTo(taskRequests, num); - } - - public int size() { - return taskRequestQueue.size(); - } - } - - /** - * One worker can have multiple running task runners. <code>HostVolumeMapping</code> - * describes various information for one worker, including : - * <ul> - * <li>host name</li> - * <li>rack name</li> - * <li>unassigned tasks for each disk volume</li> - * <li>last assigned volume id - it can be used for assigning task in a round-robin manner</li> - * <li>the number of running tasks for each volume</li> - * </ul>, each task runner and the concurrency number of running tasks for volumes. - * - * Here, we identifier a task runner by {@link ContainerId}, and we use volume ids to identify - * all disks in this node. Actually, each volume is only used to distinguish disks, and we don't - * know a certain volume id indicates a certain disk. If you want to know volume id, please read the below section. - * - * <h3>Volume id</h3> - * Volume id is an integer. Each volume id identifies each disk volume. - * - * This volume id can be obtained from org.apache.hadoop.fs.BlockStorageLocation#getVolumeIds()}. * - * HDFS cannot give any volume id due to unknown reason and disabled config 'dfs.client.file-block-locations.enabled'. - * In this case, the volume id will be -1 or other native integer. - * - * <h3>See Also</h3> - * <ul> - * <li>HDFS-3672 (https://issues.apache.org/jira/browse/HDFS-3672).</li> - * </ul> - */ - public class HostVolumeMapping { - private final String host; - private final String rack; - /** A key is disk volume, and a value is a list of tasks to be scheduled. */ - private Map<Integer, LinkedHashSet<TaskAttempt>> unassignedTaskForEachVolume = - Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<TaskAttempt>>()); - /** A value is last assigned volume id for each task runner */ - private HashMap<TajoContainerId, Integer> lastAssignedVolumeId = new HashMap<TajoContainerId, - Integer>(); - /** - * A key is disk volume id, and a value is the load of this volume. - * This load is measured by counting how many number of tasks are running. - * - * These disk volumes are kept in an order of ascending order of the volume id. - * In other words, the head volume ids are likely to -1, meaning no given volume id. - */ - private SortedMap<Integer, Integer> diskVolumeLoads = new TreeMap<Integer, Integer>(); - /** The total number of remain tasks in this host */ - private AtomicInteger remainTasksNum = new AtomicInteger(0); - public static final int REMOTE = -2; - - - public HostVolumeMapping(String host, String rack){ - this.host = host; - this.rack = rack; - } - - public synchronized void addTaskAttempt(int volumeId, TaskAttempt attemptId){ - synchronized (unassignedTaskForEachVolume){ - LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId); - if (list == null) { - list = new LinkedHashSet<TaskAttempt>(); - unassignedTaskForEachVolume.put(volumeId, list); - } - list.add(attemptId); - } - - remainTasksNum.incrementAndGet(); - - if(!diskVolumeLoads.containsKey(volumeId)) diskVolumeLoads.put(volumeId, 0); - } - - /** - * Priorities - * 1. a task list in a volume of host - * 2. unknown block or Non-splittable task in host - * 3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null - */ - public synchronized TaskAttemptId getLocalTask(TajoContainerId containerId) { - int volumeId; - TaskAttemptId taskAttemptId = null; - - if (!lastAssignedVolumeId.containsKey(containerId)) { - volumeId = getLowestVolumeId(); - increaseConcurrency(containerId, volumeId); - } else { - volumeId = lastAssignedVolumeId.get(containerId); - } - - if (unassignedTaskForEachVolume.size() > 0) { - int retry = unassignedTaskForEachVolume.size(); - do { - //clean and get a remaining local task - taskAttemptId = getAndRemove(volumeId); - if(!unassignedTaskForEachVolume.containsKey(volumeId)) { - decreaseConcurrency(containerId); - if (volumeId > REMOTE) { - diskVolumeLoads.remove(volumeId); - } - } - - if (taskAttemptId == null) { - //reassign next volume - volumeId = getLowestVolumeId(); - increaseConcurrency(containerId, volumeId); - retry--; - } else { - break; - } - } while (retry > 0); - } else { - this.remainTasksNum.set(0); - } - return taskAttemptId; - } - - public synchronized TaskAttemptId getTaskAttemptIdByRack(String rack) { - TaskAttemptId taskAttemptId = null; - - if (unassignedTaskForEachVolume.size() > 0 && this.rack.equals(rack)) { - int retry = unassignedTaskForEachVolume.size(); - do { - //clean and get a remaining task - int volumeId = getLowestVolumeId(); - taskAttemptId = getAndRemove(volumeId); - if (taskAttemptId == null) { - if (volumeId > REMOTE) { - diskVolumeLoads.remove(volumeId); - } - retry--; - } else { - break; - } - } while (retry > 0); - } - return taskAttemptId; - } - - private synchronized TaskAttemptId getAndRemove(int volumeId){ - TaskAttemptId taskAttemptId = null; - if(!unassignedTaskForEachVolume.containsKey(volumeId)) return taskAttemptId; - - LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId); - if(list != null && list.size() > 0){ - TaskAttempt taskAttempt; - synchronized (unassignedTaskForEachVolume) { - Iterator<TaskAttempt> iterator = list.iterator(); - taskAttempt = iterator.next(); - iterator.remove(); - } - - this.remainTasksNum.getAndDecrement(); - taskAttemptId = taskAttempt.getId(); - for (DataLocation location : taskAttempt.getTask().getDataLocations()) { - if (!this.getHost().equals(location.getHost())) { - HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost()); - if (volumeMapping != null) { - volumeMapping.removeTaskAttempt(location.getVolumeId(), taskAttempt); - } - } - } - } - - if(list == null || list.isEmpty()) { - unassignedTaskForEachVolume.remove(volumeId); - } - return taskAttemptId; - } - - private synchronized void removeTaskAttempt(int volumeId, TaskAttempt taskAttempt){ - if(!unassignedTaskForEachVolume.containsKey(volumeId)) return; - - LinkedHashSet<TaskAttempt> tasks = unassignedTaskForEachVolume.get(volumeId); - - if(tasks != null && tasks.size() > 0){ - tasks.remove(taskAttempt); - remainTasksNum.getAndDecrement(); - } else { - unassignedTaskForEachVolume.remove(volumeId); - } - } - - /** - * Increase the count of running tasks and disk loads for a certain task runner. - * - * @param containerId The task runner identifier - * @param volumeId Volume identifier - * @return the volume load (i.e., how many running tasks use this volume) - */ - private synchronized int increaseConcurrency(TajoContainerId containerId, int volumeId) { - - int concurrency = 1; - if (diskVolumeLoads.containsKey(volumeId)) { - concurrency = diskVolumeLoads.get(volumeId) + 1; - } - - if (volumeId > -1) { - LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency); - } else if (volumeId == -1) { - // this case is disabled namenode block meta or compressed text file or amazon s3 - LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency); - } else if (volumeId == REMOTE) { - // this case has processed all block on host and it will be assigned to remote - LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize() - + ", Remote Concurrency : " + concurrency); - } - diskVolumeLoads.put(volumeId, concurrency); - lastAssignedVolumeId.put(containerId, volumeId); - return concurrency; - } - - /** - * Decrease the count of running tasks of a certain task runner - */ - private synchronized void decreaseConcurrency(TajoContainerId containerId){ - Integer volumeId = lastAssignedVolumeId.get(containerId); - if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){ - Integer concurrency = diskVolumeLoads.get(volumeId); - if(concurrency > 0){ - diskVolumeLoads.put(volumeId, concurrency - 1); - } else { - if (volumeId > REMOTE) { - diskVolumeLoads.remove(volumeId); - } - } - } - lastAssignedVolumeId.remove(containerId); - } - - /** - * volume of a host : 0 ~ n - * compressed task, amazon s3, unKnown volume : -1 - * remote task : -2 - */ - public int getLowestVolumeId(){ - Map.Entry<Integer, Integer> volumeEntry = null; - - for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) { - if(volumeEntry == null) volumeEntry = entry; - - if (volumeEntry.getValue() >= entry.getValue()) { - volumeEntry = entry; - } - } - - if(volumeEntry != null){ - return volumeEntry.getKey(); - } else { - return REMOTE; - } - } - - public boolean isAssigned(TajoContainerId containerId){ - return lastAssignedVolumeId.containsKey(containerId); - } - - public boolean isRemote(TajoContainerId containerId){ - Integer volumeId = lastAssignedVolumeId.get(containerId); - if(volumeId == null || volumeId > REMOTE){ - return false; - } else { - return true; - } - } - - public int getRemoteConcurrency(){ - return getVolumeConcurrency(REMOTE); - } - - public int getVolumeConcurrency(int volumeId){ - Integer size = diskVolumeLoads.get(volumeId); - if(size == null) return 0; - else return size; - } - - public int getRemainingLocalTaskSize(){ - return remainTasksNum.get(); - } - - public String getHost() { - - return host; - } - - public String getRack() { - return rack; - } - } - - private class ScheduledRequests { - // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in - // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner - // if the task is not included in leafTasks and nonLeafTasks. - private final Set<TaskAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>()); - private final Set<TaskAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>()); - private Map<String, HostVolumeMapping> leafTaskHostMapping = Maps.newConcurrentMap(); - private final Map<String, HashSet<TaskAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap(); - - private synchronized void addLeafTask(TaskAttemptToSchedulerEvent event) { - TaskAttempt taskAttempt = event.getTaskAttempt(); - List<DataLocation> locations = taskAttempt.getTask().getDataLocations(); - - for (DataLocation location : locations) { - String host = location.getHost(); - - HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); - if (hostVolumeMapping == null) { - String rack = RackResolver.resolve(host).getNetworkLocation(); - hostVolumeMapping = new HostVolumeMapping(host, rack); - leafTaskHostMapping.put(host, hostVolumeMapping); - } - hostVolumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt); - - if (LOG.isDebugEnabled()) { - LOG.debug("Added attempt req to host " + host); - } - - HashSet<TaskAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack()); - if (list == null) { - list = new HashSet<TaskAttemptId>(); - leafTasksRackMapping.put(hostVolumeMapping.getRack(), list); - } - - list.add(taskAttempt.getId()); - - if (LOG.isDebugEnabled()) { - LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack()); - } - } - - leafTasks.add(taskAttempt.getId()); - } - - private void addNonLeafTask(TaskAttemptToSchedulerEvent event) { - nonLeafTasks.add(event.getTaskAttempt().getId()); - } - - public int leafTaskNum() { - return leafTasks.size(); - } - - public int nonLeafTaskNum() { - return nonLeafTasks.size(); - } - - public Set<TaskAttemptId> assignedRequest = new HashSet<TaskAttemptId>(); - - private TaskAttemptId allocateLocalTask(String host, TajoContainerId containerId){ - HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); - - if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode - for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); i++) { - TaskAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId); - - if(attemptId == null) break; - //find remaining local task - if (leafTasks.contains(attemptId)) { - leafTasks.remove(attemptId); - //LOG.info(attemptId + " Assigned based on host match " + hostName); - hostLocalAssigned++; - totalAssigned++; - return attemptId; - } - } - } - return null; - } - - private TaskAttemptId allocateRackTask(String host) { - - List<HostVolumeMapping> remainingTasks = Lists.newArrayList(leafTaskHostMapping.values()); - String rack = RackResolver.resolve(host).getNetworkLocation(); - TaskAttemptId attemptId = null; - - if (remainingTasks.size() > 0) { - synchronized (scheduledRequests) { - //find largest remaining task of other host in rack - Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() { - @Override - public int compare(HostVolumeMapping v1, HostVolumeMapping v2) { - // descending remaining tasks - if (v2.remainTasksNum.get() > v1.remainTasksNum.get()) { - return 1; - } else if (v2.remainTasksNum.get() == v1.remainTasksNum.get()) { - return 0; - } else { - return -1; - } - } - }); - } - - for (HostVolumeMapping tasks : remainingTasks) { - for (int i = 0; i < tasks.getRemainingLocalTaskSize(); i++) { - TaskAttemptId tId = tasks.getTaskAttemptIdByRack(rack); - - if (tId == null) break; - - if (leafTasks.contains(tId)) { - leafTasks.remove(tId); - attemptId = tId; - break; - } - } - if(attemptId != null) break; - } - } - - //find task in rack - if (attemptId == null) { - HashSet<TaskAttemptId> list = leafTasksRackMapping.get(rack); - if (list != null) { - synchronized (list) { - Iterator<TaskAttemptId> iterator = list.iterator(); - while (iterator.hasNext()) { - TaskAttemptId tId = iterator.next(); - iterator.remove(); - if (leafTasks.contains(tId)) { - leafTasks.remove(tId); - attemptId = tId; - break; - } - } - } - } - } - - if (attemptId != null) { - rackLocalAssigned++; - totalAssigned++; - - LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%, Rack host: %s", - hostLocalAssigned, rackLocalAssigned, totalAssigned, - ((double) hostLocalAssigned / (double) totalAssigned) * 100, host)); - - } - return attemptId; - } - - public void assignToLeafTasks(LinkedList<TaskRequestEvent> taskRequests) { - Collections.shuffle(taskRequests); - LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>(); - - TaskRequestEvent taskRequest; - while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) { - taskRequest = taskRequests.pollFirst(); - if(taskRequest == null) { // if there are only remote task requests - taskRequest = remoteTaskRequests.pollFirst(); - } - - // checking if this container is still alive. - // If not, ignore the task request and stop the task runner - ContainerProxy container = context.getMasterContext().getResourceAllocator() - .getContainer(taskRequest.getContainerId()); - if(container == null) { - taskRequest.getCallback().run(stopTaskRunnerReq); - continue; - } - - // getting the hostname of requested node - WorkerConnectionInfo connectionInfo = - context.getMasterContext().getResourceAllocator().getWorkerConnectionInfo(taskRequest.getWorkerId()); - String host = connectionInfo.getHost(); - - // if there are no worker matched to the hostname a task request - if(!leafTaskHostMapping.containsKey(host)){ - String normalizedHost = NetUtils.normalizeHost(host); - - if(!leafTaskHostMapping.containsKey(normalizedHost) && !taskRequests.isEmpty()){ - // this case means one of either cases: - // * there are no blocks which reside in this node. - // * all blocks which reside in this node are consumed, and this task runner requests a remote task. - // In this case, we transfer the task request to the remote task request list, and skip the followings. - remoteTaskRequests.add(taskRequest); - continue; - } - } - - TajoContainerId containerId = taskRequest.getContainerId(); - LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," + - "containerId=" + containerId); - - ////////////////////////////////////////////////////////////////////// - // disk or host-local allocation - ////////////////////////////////////////////////////////////////////// - TaskAttemptId attemptId = allocateLocalTask(host, containerId); - - if (attemptId == null) { // if a local task cannot be found - HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); - - if(hostVolumeMapping != null) { - if(!hostVolumeMapping.isRemote(containerId)){ - // assign to remote volume - hostVolumeMapping.decreaseConcurrency(containerId); - hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE); - } - // this part is remote concurrency management of a tail tasks - int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1); - - if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){ - //release container - hostVolumeMapping.decreaseConcurrency(containerId); - taskRequest.getCallback().run(stopTaskRunnerReq); - continue; - } - } - - ////////////////////////////////////////////////////////////////////// - // rack-local allocation - ////////////////////////////////////////////////////////////////////// - attemptId = allocateRackTask(host); - - ////////////////////////////////////////////////////////////////////// - // random node allocation - ////////////////////////////////////////////////////////////////////// - if (attemptId == null && leafTaskNum() > 0) { - synchronized (leafTasks){ - attemptId = leafTasks.iterator().next(); - leafTasks.remove(attemptId); - rackLocalAssigned++; - totalAssigned++; - LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,", - hostLocalAssigned, rackLocalAssigned, totalAssigned, - ((double) hostLocalAssigned / (double) totalAssigned) * 100)); - } - } - } - - if (attemptId != null) { - Task task = stage.getTask(attemptId.getTaskId()); - TaskRequest taskAssign = new TaskRequestImpl( - attemptId, - new ArrayList<FragmentProto>(task.getAllFragments()), - "", - false, - LogicalNodeSerializer.serialize(task.getLogicalPlan()), - context.getMasterContext().getQueryContext(), - stage.getDataChannel(), stage.getBlock().getEnforcer()); - if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { - taskAssign.setInterQuery(); - } - - context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, - taskRequest.getContainerId(), connectionInfo)); - assignedRequest.add(attemptId); - - scheduledObjectNum--; - taskRequest.getCallback().run(taskAssign.getProto()); - } else { - throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!"); - } - } - } - - private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) { - if (masterPlan.isRoot(block)) { - return false; - } - - ExecutionBlock parent = masterPlan.getParent(block); - if (masterPlan.isRoot(parent) && parent.hasUnion()) { - return false; - } - - return true; - } - - public void assignToNonLeafTasks(LinkedList<TaskRequestEvent> taskRequests) { - Collections.shuffle(taskRequests); - - TaskRequestEvent taskRequest; - while (!taskRequests.isEmpty()) { - taskRequest = taskRequests.pollFirst(); - LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId()); - - TaskAttemptId attemptId; - // random allocation - if (nonLeafTasks.size() > 0) { - synchronized (nonLeafTasks){ - attemptId = nonLeafTasks.iterator().next(); - nonLeafTasks.remove(attemptId); - } - LOG.debug("Assigned based on * match"); - - Task task; - task = stage.getTask(attemptId.getTaskId()); - TaskRequest taskAssign = new TaskRequestImpl( - attemptId, - Lists.newArrayList(task.getAllFragments()), - "", - false, - LogicalNodeSerializer.serialize(task.getLogicalPlan()), - context.getMasterContext().getQueryContext(), - stage.getDataChannel(), - stage.getBlock().getEnforcer()); - if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { - taskAssign.setInterQuery(); - } - for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) { - Collection<FetchImpl> fetches = entry.getValue(); - if (fetches != null) { - for (FetchImpl fetch : fetches) { - taskAssign.addFetch(entry.getKey(), fetch); - } - } - } - - WorkerConnectionInfo connectionInfo = context.getMasterContext().getResourceAllocator(). - getWorkerConnectionInfo(taskRequest.getWorkerId()); - context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, - taskRequest.getContainerId(), connectionInfo)); - taskRequest.getCallback().run(taskAssign.getProto()); - totalAssigned++; - scheduledObjectNum--; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java deleted file mode 100644 index 21e376c..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.master.event.TaskSchedulerEvent; -import org.apache.tajo.worker.FetchImpl; - -import java.util.List; -import java.util.Map; - -public class FetchScheduleEvent extends TaskSchedulerEvent { - private final Map<String, List<FetchImpl>> fetches; - - public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId, - final Map<String, List<FetchImpl>> fetches) { - super(eventType, blockId); - this.fetches = fetches; - } - - public Map<String, List<FetchImpl>> getFetches() { - return fetches; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java deleted file mode 100644 index 827386b..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import com.google.common.base.Objects; -import org.apache.tajo.storage.fragment.Fragment; - -/** - * FragmentPair consists of two fragments, a left fragment and a right fragment. - * According to queries, it can have the different values. - * For join queries, it is assumed to have both fragments. - * Also, the left fragment is assumed to be a fragment of the larger table. - * For other queries, it is assumed to have only a left fragment. - */ -public class FragmentPair { - private Fragment leftFragment; - private Fragment rightFragment; - - public FragmentPair(Fragment left) { - this.leftFragment = left; - } - - public FragmentPair(Fragment left, Fragment right) { - this.leftFragment = left; - this.rightFragment = right; - } - - public Fragment getLeftFragment() { - return leftFragment; - } - - public Fragment getRightFragment() { - return rightFragment; - } - - @Override - public boolean equals(Object o) { - if (o instanceof FragmentPair) { - FragmentPair other = (FragmentPair) o; - boolean eq = this.leftFragment.equals(other.leftFragment); - if (this.rightFragment != null && other.rightFragment != null) { - eq &= this.rightFragment.equals(other.rightFragment); - } else if (this.rightFragment == null && other.rightFragment == null) { - eq &= true; - } else { - return false; - } - return eq; - } - return false; - } - - @Override - public int hashCode() { - return Objects.hashCode(leftFragment, rightFragment); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 51964f0..9d853a5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -39,7 +39,7 @@ import org.apache.tajo.master.TajoMaster.MasterContext; import org.apache.tajo.master.exec.DDLExecutor; import org.apache.tajo.master.exec.QueryExecutor; import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager; -import org.apache.tajo.master.session.Session; +import org.apache.tajo.session.Session; import org.apache.tajo.plan.*; import org.apache.tajo.plan.logical.InsertNode; import org.apache.tajo.plan.logical.LogicalRootNode; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java deleted file mode 100644 index d6ea459..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import com.google.protobuf.ByteString; - -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryId; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.TaskId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.engine.planner.physical.SeqScanExec; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.storage.RowStoreUtil; -import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.fragment.FragmentConvertor; -import org.apache.tajo.worker.TaskAttemptContext; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner { - private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100; - - private QueryId queryId; - private String sessionId; - private SeqScanExec scanExec; - private TableDesc tableDesc; - private RowStoreEncoder rowEncoder; - private int maxRow; - private int currentNumRows; - private TaskAttemptContext taskContext; - private TajoConf tajoConf; - private ScanNode scanNode; - - private int currentFragmentIndex = 0; - - public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, - TableDesc tableDesc, int maxRow) throws IOException { - this.tajoConf = tajoConf; - this.sessionId = sessionId; - this.queryId = queryId; - this.scanNode = scanNode; - this.tableDesc = tableDesc; - this.maxRow = maxRow; - this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema()); - } - - public void init() throws IOException { - initSeqScanExec(); - } - - private void initSeqScanExec() throws IOException { - List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType()) - .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); - - if (fragments != null && !fragments.isEmpty()) { - FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {})); - this.taskContext = new TaskAttemptContext( - new QueryContext(tajoConf), null, - new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), - fragmentProtos, null); - try { - // scanNode must be clone cause SeqScanExec change target in the case of - // a partitioned table. - scanExec = new SeqScanExec(taskContext, (ScanNode) scanNode.clone(), fragmentProtos); - } catch (CloneNotSupportedException e) { - throw new IOException(e.getMessage(), e); - } - scanExec.init(); - currentFragmentIndex += fragments.size(); - } - } - - public QueryId getQueryId() { - return queryId; - } - - public String getSessionId() { - return sessionId; - } - - public void setScanExec(SeqScanExec scanExec) { - this.scanExec = scanExec; - } - - public TableDesc getTableDesc() { - return tableDesc; - } - - public void close() throws Exception { - if (scanExec != null) { - scanExec.close(); - scanExec = null; - } - } - - public List<ByteString> getNextRows(int fetchRowNum) throws IOException { - List<ByteString> rows = new ArrayList<ByteString>(); - if (scanExec == null) { - return rows; - } - int rowCount = 0; - while (true) { - Tuple tuple = scanExec.next(); - if (tuple == null) { - scanExec.close(); - scanExec = null; - initSeqScanExec(); - if (scanExec != null) { - tuple = scanExec.next(); - } - if (tuple == null) { - if (scanExec != null) { - scanExec.close(); - scanExec = null; - } - break; - } - } - rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple)))); - rowCount++; - currentNumRows++; - if (rowCount >= fetchRowNum) { - break; - } - if (currentNumRows >= maxRow) { - scanExec.close(); - scanExec = null; - break; - } - } - return rows; - } - - @Override - public Schema getLogicalSchema() { - return tableDesc.getLogicalSchema(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java deleted file mode 100644 index 7e7d705..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import java.io.IOException; -import java.util.List; - -import org.apache.tajo.QueryId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; - -import com.google.protobuf.ByteString; - -public interface NonForwardQueryResultScanner { - - public void close() throws Exception; - - public Schema getLogicalSchema(); - - public List<ByteString> getNextRows(int fetchRowNum) throws IOException; - - public QueryId getQueryId(); - - public String getSessionId(); - - public TableDesc getTableDesc(); - - public void init() throws IOException; - -}
