TAJO-1166: S3 related storage causes compilation error in Hadoop 2.6.0-SNAPSHOT. (jaehwa)
Closes #262 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/3d485ecb Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/3d485ecb Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/3d485ecb Branch: refs/heads/master Commit: 3d485ecb0112af12258a5a2bdc4a400b8df4fae8 Parents: 0c97fc0 Author: JaeHwa Jung <[email protected]> Authored: Thu Nov 27 18:42:24 2014 +0900 Committer: JaeHwa Jung <[email protected]> Committed: Thu Nov 27 18:45:18 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + tajo-core/pom.xml | 1 + .../org/apache/tajo/master/ContainerProxy.java | 10 +- .../tajo/master/DefaultTaskScheduler.java | 18 +- .../tajo/master/LaunchTaskRunnersEvent.java | 5 +- .../apache/tajo/master/LazyTaskScheduler.java | 9 +- .../apache/tajo/master/TajoContainerProxy.java | 22 +- .../apache/tajo/master/TajoMasterService.java | 5 +- .../tajo/master/TaskRunnerGroupEvent.java | 8 +- .../tajo/master/container/TajoContainer.java | 173 ++++++++++ .../tajo/master/container/TajoContainerId.java | 172 ++++++++++ .../master/container/TajoConverterUtils.java | 263 ++++++++++++++++ .../master/container/TajoRecordFactory.java | 31 ++ .../container/TajoRecordFactoryPBImpl.java | 104 ++++++ .../container/TajoRecordFactoryProvider.java | 70 +++++ .../tajo/master/container/TajoRecords.java | 39 +++ .../impl/pb/TajoContainerIdPBImpl.java | 100 ++++++ .../tajo/master/event/LocalTaskEvent.java | 9 +- .../event/QueryUnitAttemptScheduleEvent.java | 10 +- .../event/SubQueryContainerAllocationEvent.java | 8 +- .../master/event/TaskAttemptAssignedEvent.java | 8 +- .../tajo/master/event/TaskRequestEvent.java | 8 +- .../master/querymaster/QueryInProgress.java | 3 +- .../querymaster/QueryMasterManagerService.java | 4 +- .../master/querymaster/QueryUnitAttempt.java | 6 +- .../tajo/master/querymaster/SubQuery.java | 17 +- .../apache/tajo/master/rm/TajoRMContext.java | 8 +- .../tajo/master/rm/TajoWorkerContainer.java | 15 +- .../tajo/master/rm/TajoWorkerContainerId.java | 53 ++-- .../master/rm/TajoWorkerResourceManager.java | 63 ++-- .../tajo/master/rm/WorkerResourceManager.java | 4 +- .../tajo/worker/AbstractResourceAllocator.java | 14 +- .../apache/tajo/worker/ResourceAllocator.java | 6 +- .../tajo/worker/TajoResourceAllocator.java | 101 +++--- .../java/org/apache/tajo/worker/TaskRunner.java | 16 +- .../apache/tajo/worker/TaskRunnerHistory.java | 14 +- .../src/main/proto/ContainerProtocol.proto | 48 +++ .../src/main/proto/QueryMasterProtocol.proto | 3 + .../main/proto/ResourceTrackerProtocol.proto | 3 + .../src/main/proto/TajoMasterProtocol.proto | 7 +- .../src/main/proto/TajoWorkerProtocol.proto | 5 +- .../tajo/master/rm/TestTajoResourceManager.java | 16 +- .../org/apache/tajo/storage/StorageManager.java | 1 + .../apache/tajo/storage/TestFileSystems.java | 267 ++++++++-------- .../java/org/apache/tajo/storage/s3/INode.java | 124 -------- .../storage/s3/InMemoryFileSystemStore.java | 175 ----------- .../apache/tajo/storage/s3/S3OutputStream.java | 234 -------------- .../tajo/storage/s3/SmallBlockS3FileSystem.java | 314 ------------------- 48 files changed, 1392 insertions(+), 1204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 6521c1c..1657c20 100644 --- a/CHANGES +++ b/CHANGES @@ -68,6 +68,8 @@ Release 0.9.1 - unreleased BUG FIXES + TAJO-1166: S3 related storage causes compilation error in Hadoop 2.6.0-SNAPSHOT. (jaehwa) + TAJO-1208: Failure of create table using textfile on hivemeta. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml index fce96e4..060ac1b 100644 --- a/tajo-core/pom.xml +++ b/tajo-core/pom.xml @@ -162,6 +162,7 @@ <argument>--proto_path=../tajo-client/src/main/proto</argument> <argument>--proto_path=../tajo-plan/src/main/proto</argument> <argument>--java_out=target/generated-sources/proto</argument> + <argument>src/main/proto/ContainerProtocol.proto</argument> <argument>src/main/proto/ResourceTrackerProtocol.proto</argument> <argument>src/main/proto/QueryMasterProtocol.proto</argument> <argument>src/main/proto/TajoMasterProtocol.proto</argument> http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java index 59b071a..462de91 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java @@ -22,11 +22,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.master.querymaster.QueryMasterTask; +import org.apache.tajo.master.container.TajoContainer; +import org.apache.tajo.master.container.TajoContainerId; public abstract class ContainerProxy { protected static final Log LOG = LogFactory.getLog(ContainerProxy.class); @@ -45,8 +45,8 @@ public abstract class ContainerProxy { protected ContainerState state; // store enough information to be able to cleanup the container - protected Container container; - protected ContainerId containerID; + protected TajoContainer container; + protected TajoContainerId containerID; protected String hostName; protected int port = -1; @@ -54,7 +54,7 @@ public abstract class ContainerProxy { public abstract void stopContainer(); public ContainerProxy(QueryMasterTask.QueryMasterTaskContext context, Configuration conf, - ExecutionBlockId executionBlockId, Container container) { + ExecutionBlockId executionBlockId, TajoContainer container) { this.context = context; this.conf = conf; this.state = ContainerState.PREP; http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index 62d4892..77e3257 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -41,6 +41,7 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.master.querymaster.QueryUnit; import org.apache.tajo.master.querymaster.QueryUnitAttempt; import org.apache.tajo.master.querymaster.SubQuery; +import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.NetUtils; @@ -338,7 +339,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private Map<Integer, LinkedHashSet<QueryUnitAttempt>> unassignedTaskForEachVolume = Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<QueryUnitAttempt>>()); /** A value is last assigned volume id for each task runner */ - private HashMap<ContainerId, Integer> lastAssignedVolumeId = new HashMap<ContainerId, Integer>(); + private HashMap<TajoContainerId, Integer> lastAssignedVolumeId = new HashMap<TajoContainerId, + Integer>(); /** * A key is disk volume id, and a value is the load of this volume. * This load is measured by counting how many number of tasks are running. @@ -378,7 +380,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { * 2. unknown block or Non-splittable task in host * 3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null */ - public synchronized QueryUnitAttemptId getLocalTask(ContainerId containerId) { + public synchronized QueryUnitAttemptId getLocalTask(TajoContainerId containerId) { int volumeId; QueryUnitAttemptId queryUnitAttemptId = null; @@ -489,7 +491,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { * @param volumeId Volume identifier * @return the volume load (i.e., how many running tasks use this volume) */ - private synchronized int increaseConcurrency(ContainerId containerId, int volumeId) { + private synchronized int increaseConcurrency(TajoContainerId containerId, int volumeId) { int concurrency = 1; if (diskVolumeLoads.containsKey(volumeId)) { @@ -514,7 +516,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { /** * Decrease the count of running tasks of a certain task runner */ - private synchronized void decreaseConcurrency(ContainerId containerId){ + private synchronized void decreaseConcurrency(TajoContainerId containerId){ Integer volumeId = lastAssignedVolumeId.get(containerId); if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){ Integer concurrency = diskVolumeLoads.get(volumeId); @@ -552,11 +554,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } - public boolean isAssigned(ContainerId containerId){ + public boolean isAssigned(TajoContainerId containerId){ return lastAssignedVolumeId.containsKey(containerId); } - public boolean isRemote(ContainerId containerId){ + public boolean isRemote(TajoContainerId containerId){ Integer volumeId = lastAssignedVolumeId.get(containerId); if(volumeId == null || volumeId > REMOTE){ return false; @@ -647,7 +649,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>(); - private QueryUnitAttemptId allocateLocalTask(String host, ContainerId containerId){ + private QueryUnitAttemptId allocateLocalTask(String host, TajoContainerId containerId){ HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode @@ -778,7 +780,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } - ContainerId containerId = taskRequest.getContainerId(); + TajoContainerId containerId = taskRequest.getContainerId(); LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," + "containerId=" + containerId); http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java index 9a4a01d..e620afa 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java @@ -18,9 +18,9 @@ package org.apache.tajo.master; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.master.container.TajoContainer; import java.util.Collection; @@ -29,7 +29,8 @@ public class LaunchTaskRunnersEvent extends TaskRunnerGroupEvent { private final String planJson; public LaunchTaskRunnersEvent(ExecutionBlockId executionBlockId, - Collection<Container> containers, QueryContext queryContext, String planJson) { + Collection<TajoContainer> containers, QueryContext queryContext, + String planJson) { super(EventType.CONTAINER_REMOTE_LAUNCH, executionBlockId, containers); this.queryContext = queryContext; this.planJson = planJson; http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java index f7953e0..b2883cc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java @@ -21,7 +21,6 @@ package org.apache.tajo.master; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryIdFactory; @@ -38,6 +37,7 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.master.querymaster.QueryUnit; import org.apache.tajo.master.querymaster.QueryUnitAttempt; import org.apache.tajo.master.querymaster.SubQuery; +import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.NetUtils; import org.apache.tajo.worker.FetchImpl; @@ -246,7 +246,8 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { } private static class DiskBalancer { - private HashMap<ContainerId, Integer> containerDiskMap = new HashMap<ContainerId, Integer>(); + private HashMap<TajoContainerId, Integer> containerDiskMap = new HashMap<TajoContainerId, + Integer>(); private HashMap<Integer, Integer> diskReferMap = new HashMap<Integer, Integer>(); private String host; @@ -260,7 +261,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { } } - public Integer getDiskId(ContainerId containerId) { + public Integer getDiskId(TajoContainerId containerId) { if (!containerDiskMap.containsKey(containerId)) { assignVolumeId(containerId); } @@ -268,7 +269,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { return containerDiskMap.get(containerId); } - public void assignVolumeId(ContainerId containerId){ + public void assignVolumeId(TajoContainerId containerId){ Map.Entry<Integer, Integer> volumeEntry = null; for (Map.Entry<Integer, Integer> entry : diskReferMap.entrySet()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index c236c20..158316e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -20,17 +20,17 @@ package org.apache.tajo.master; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.querymaster.QueryMasterTask; +import org.apache.tajo.master.container.TajoContainer; +import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.rm.TajoWorkerContainer; import org.apache.tajo.master.rm.TajoWorkerContainerId; import org.apache.tajo.rpc.NettyClientBase; @@ -47,7 +47,7 @@ public class TajoContainerProxy extends ContainerProxy { private final String planJson; public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context, - Configuration conf, Container container, + Configuration conf, TajoContainer container, QueryContext queryContext, ExecutionBlockId executionBlockId, String planJson) { super(context, conf, executionBlockId, container); this.queryContext = queryContext; @@ -89,7 +89,7 @@ public class TajoContainerProxy extends ContainerProxy { } } - private void assignExecutionBlock(ExecutionBlockId executionBlockId, Container container) { + private void assignExecutionBlock(ExecutionBlockId executionBlockId, TajoContainer container) { NettyClientBase tajoWorkerRpc = null; try { InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext() @@ -149,8 +149,8 @@ public class TajoContainerProxy extends ContainerProxy { public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context, ExecutionBlockId executionBlockId, - ContainerId containerId) throws Exception { - List<ContainerId> containerIds = new ArrayList<ContainerId>(); + TajoContainerId containerId) throws Exception { + List<TajoContainerId> containerIds = new ArrayList<TajoContainerId>(); containerIds.add(containerId); releaseWorkerResource(context, executionBlockId, containerIds); @@ -158,11 +158,11 @@ public class TajoContainerProxy extends ContainerProxy { public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context, ExecutionBlockId executionBlockId, - List<ContainerId> containerIds) throws Exception { - List<YarnProtos.ContainerIdProto> containerIdProtos = - new ArrayList<YarnProtos.ContainerIdProto>(); + List<TajoContainerId> containerIds) throws Exception { + List<ContainerProtocol.TajoContainerIdProto> containerIdProtos = + new ArrayList<ContainerProtocol.TajoContainerIdProto>(); - for(ContainerId eachContainerId: containerIds) { + for(TajoContainerId eachContainerId: containerIds) { containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java index ddf24d3..1e3501c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.QueryId; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.querymaster.QueryJobManager; @@ -128,9 +129,9 @@ public class TajoMasterService extends AbstractService { public void releaseWorkerResource(RpcController controller, TajoMasterProtocol.WorkerResourceReleaseRequest request, RpcCallback<PrimitiveProtos.BoolProto> done) { - List<YarnProtos.ContainerIdProto> containerIds = request.getContainerIdsList(); + List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList(); - for(YarnProtos.ContainerIdProto eachContainer: containerIds) { + for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) { context.getResourceManager().releaseWorkerResource(eachContainer); } done.run(BOOL_TRUE); http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java index 1e6655c..c1c6522 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java @@ -18,10 +18,10 @@ package org.apache.tajo.master; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; +import org.apache.tajo.master.container.TajoContainer; import java.util.Collection; @@ -32,16 +32,16 @@ public class TaskRunnerGroupEvent extends AbstractEvent<EventType> { } protected final ExecutionBlockId executionBlockId; - protected final Collection<Container> containers; + protected final Collection<TajoContainer> containers; public TaskRunnerGroupEvent(EventType eventType, ExecutionBlockId executionBlockId, - Collection<Container> containers) { + Collection<TajoContainer> containers) { super(eventType); this.executionBlockId = executionBlockId; this.containers = containers; } - public Collection<Container> getContainers() { + public Collection<TajoContainer> getContainers() { return containers; } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java new file mode 100644 index 0000000..77562b5 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.container; + + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.util.Records; + +/** + * This class is borrowed from the following source code : + * ${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/api/records/Container.java + * + * <p><code>TajoContainer</code> represents an allocated resource in the cluster. + * </p> + * + * <p>The <code>ResourceManager</code> is the sole authority to allocate any + * <code>TajoContainer</code> to applications. The allocated <code>TajoContainer</code> + * is always on a single node and has a unique {@link org.apache.tajo.master.container.TajoContainerId}. It has + * a specific amount of {@link org.apache.hadoop.yarn.api.records.Resource} allocated.</p> + * + * <p>It includes details such as: + * <ul> + * <li>{@link org.apache.tajo.master.container.TajoContainerId} for the container, which is globally unique.</li> + * <li> + * {@link org.apache.hadoop.yarn.api.records.NodeId} of the node on which it is allocated. + * </li> + * <li>HTTP uri of the node.</li> + * <li>{@link org.apache.hadoop.yarn.api.records.Resource} allocated to the container.</li> + * <li>{@link org.apache.hadoop.yarn.api.records.Priority} at which the container was allocated.</li> + * <li> + * TajoContainer {@link org.apache.hadoop.yarn.api.records.Token} of the container, used to securely verify + * authenticity of the allocation. + * </li> + * </ul> + * </p> + * + * <p>Typically, an <code>ApplicationMaster</code> receives the + * <code>TajoContainer</code> from the <code>ResourceManager</code> during + * resource-negotiation and then talks to the <code>NodeManager</code> to + * start/stop containers.</p> + * + * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest) + * @see org.apache.hadoop.yarn.api.ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest) + * @see org.apache.hadoop.yarn.api.ContainerManagementProtocol#stopContainers(org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest) + */ +@Public +@Stable +public abstract class TajoContainer implements Comparable<TajoContainer> { + + @Private + @Unstable + public static TajoContainer newInstance(TajoContainerId containerId, NodeId nodeId, + String nodeHttpAddress, Resource resource, Priority priority, + Token containerToken) { + TajoContainer container = Records.newRecord(TajoContainer.class); + container.setId(containerId); + container.setNodeId(nodeId); + container.setNodeHttpAddress(nodeHttpAddress); + container.setResource(resource); + container.setPriority(priority); + container.setContainerToken(containerToken); + return container; + } + + /** + * Get the globally unique identifier for the container. + * @return globally unique identifier for the container + */ + @Public + @Stable + public abstract TajoContainerId getId(); + + @Private + @Unstable + public abstract void setId(TajoContainerId id); + + /** + * Get the identifier of the node on which the container is allocated. + * @return identifier of the node on which the container is allocated + */ + @Public + @Stable + public abstract NodeId getNodeId(); + + @Private + @Unstable + public abstract void setNodeId(NodeId nodeId); + + /** + * Get the http uri of the node on which the container is allocated. + * @return http uri of the node on which the container is allocated + */ + @Public + @Stable + public abstract String getNodeHttpAddress(); + + @Private + @Unstable + public abstract void setNodeHttpAddress(String nodeHttpAddress); + + /** + * Get the <code>Resource</code> allocated to the container. + * @return <code>Resource</code> allocated to the container + */ + @Public + @Stable + public abstract Resource getResource(); + + @Private + @Unstable + public abstract void setResource(Resource resource); + + /** + * Get the <code>Priority</code> at which the <code>TajoContainer</code> was + * allocated. + * @return <code>Priority</code> at which the <code>TajoContainer</code> was + * allocated + */ + @Public + @Stable + public abstract Priority getPriority(); + + @Private + @Unstable + public abstract void setPriority(Priority priority); + + /** + * Get the <code>TajoContainerToken</code> for the container. + * <p><code>TajoContainerToken</code> is the security token used by the framework + * to verify authenticity of any <code>TajoContainer</code>.</p> + * + * <p>The <code>ResourceManager</code>, on container allocation provides a + * secure token which is verified by the <code>NodeManager</code> on + * container launch.</p> + * + * <p>Applications do not need to care about <code>TajoContainerToken</code>, they + * are transparently handled by the framework - the allocated + * <code>TajoContainer</code> includes the <code>TajoContainerToken</code>.</p> + * + * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest) + * @see org.apache.hadoop.yarn.api.ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest) + * + * @return <code>TajoContainerToken</code> for the container + */ + @Public + @Stable + public abstract Token getContainerToken(); + + @Private + @Unstable + public abstract void setContainerToken(Token containerToken); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java new file mode 100644 index 0000000..0de5fe0 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.container; + +import java.text.NumberFormat; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Records; + +/** + * This class is borrowed from the following source code : + * ${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java + * + * <p><code>TajoContainerId</code> represents a globally unique identifier + * for a {@link org.apache.tajo.master.container.TajoContainer} in the cluster.</p> + */ +@Public +@Stable +public abstract class TajoContainerId implements Comparable<TajoContainerId>{ + + @Private + @Unstable + public static TajoContainerId newInstance(ApplicationAttemptId appAttemptId, + int containerId) { + TajoContainerId id = TajoRecords.newRecord(TajoContainerId.class); + id.setId(containerId); + id.setApplicationAttemptId(appAttemptId); + id.build(); + return id; + } + + /** + * Get the <code>ApplicationAttemptId</code> of the application to which the + * <code>Container</code> was assigned. + * <p> + * Note: If containers are kept alive across application attempts via + * {@link org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext#setKeepContainersAcrossApplicationAttempts(boolean)} + * the <code>TajoContainerId</code> does not necessarily contain the current + * running application attempt's <code>ApplicationAttemptId</code> This + * container can be allocated by previously exited application attempt and + * managed by the current running attempt thus have the previous application + * attempt's <code>ApplicationAttemptId</code>. + * </p> + * + * @return <code>ApplicationAttemptId</code> of the application to which the + * <code>Container</code> was assigned + */ + @Public + @Stable + public abstract ApplicationAttemptId getApplicationAttemptId(); + + @Private + @Unstable + protected abstract void setApplicationAttemptId(ApplicationAttemptId atId); + + /** + * Get the identifier of the <code>TajoContainerId</code>. + * @return identifier of the <code>TajoContainerId</code> + */ + @Public + @Stable + public abstract int getId(); + + @Private + @Unstable + protected abstract void setId(int id); + + + // TODO: fail the app submission if attempts are more than 10 or something + private static final ThreadLocal<NumberFormat> appAttemptIdFormat = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(2); + return fmt; + } + }; + // TODO: Why thread local? + // ^ NumberFormat instances are not threadsafe + private static final ThreadLocal<NumberFormat> containerIdFormat = + new ThreadLocal<NumberFormat>() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(6); + return fmt; + } + }; + + @Override + public int hashCode() { + // Generated by eclipse. + final int prime = 435569; + int result = 7507; + result = prime * result + getId(); + result = prime * result + getApplicationAttemptId().hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + TajoContainerId other = (TajoContainerId) obj; + if (!this.getApplicationAttemptId().equals(other.getApplicationAttemptId())) + return false; + if (this.getId() != other.getId()) + return false; + return true; + } + + @Override + public int compareTo(TajoContainerId other) { + if (this.getApplicationAttemptId().compareTo( + other.getApplicationAttemptId()) == 0) { + return this.getId() - other.getId(); + } else { + return this.getApplicationAttemptId().compareTo( + other.getApplicationAttemptId()); + } + + } + + @Override + public String toString() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(4); + + StringBuilder sb = new StringBuilder(); + sb.append("container_"); + ApplicationId appId = getApplicationAttemptId().getApplicationId(); + sb.append(appId.getClusterTimestamp()).append("_"); + sb.append(fmt.format(appId.getId())) + .append("_"); + sb.append( + appAttemptIdFormat.get().format( + getApplicationAttemptId().getAttemptId())).append("_"); + sb.append(containerIdFormat.get().format(getId())); + return sb.toString(); + } + + protected abstract void build(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java new file mode 100644 index 0000000..a6db654 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.master.container; + + +import static org.apache.hadoop.yarn.util.StringHelper._split; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; + + +/** + * This class is borrowed from the following source code : + * ${hadoop-yarn-common}/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java + * + * This class contains a set of utilities which help converting data structures + * from/to 'serializableFormat' to/from hadoop/nativejava data structures. + * + */ +@Private +public class TajoConverterUtils { + + public static final String APPLICATION_PREFIX = "application"; + public static final String CONTAINER_PREFIX = "container"; + public static final String APPLICATION_ATTEMPT_PREFIX = "appattempt"; + + /** + * return a hadoop path from a given url + * + * @param url + * url to convert + * @return path from {@link URL} + * @throws URISyntaxException + */ + public static Path getPathFromYarnURL(URL url) throws URISyntaxException { + String scheme = url.getScheme() == null ? "" : url.getScheme(); + + String authority = ""; + if (url.getHost() != null) { + authority = url.getHost(); + if (url.getUserInfo() != null) { + authority = url.getUserInfo() + "@" + authority; + } + if (url.getPort() > 0) { + authority += ":" + url.getPort(); + } + } + + return new Path( + (new URI(scheme, authority, url.getFile(), null, null)).normalize()); + } + + /** + * change from CharSequence to string for map key and value + * @param env map for converting + * @return string,string map + */ + public static Map<String, String> convertToString( + Map<CharSequence, CharSequence> env) { + + Map<String, String> stringMap = new HashMap<String, String>(); + for (Entry<CharSequence, CharSequence> entry: env.entrySet()) { + stringMap.put(entry.getKey().toString(), entry.getValue().toString()); + } + return stringMap; + } + + public static URL getYarnUrlFromPath(Path path) { + return getYarnUrlFromURI(path.toUri()); + } + + public static URL getYarnUrlFromURI(URI uri) { + URL url = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(URL.class); + if (uri.getHost() != null) { + url.setHost(uri.getHost()); + } + if (uri.getUserInfo() != null) { + url.setUserInfo(uri.getUserInfo()); + } + url.setPort(uri.getPort()); + url.setScheme(uri.getScheme()); + url.setFile(uri.getPath()); + return url; + } + + public static String toString(ApplicationId appId) { + return appId.toString(); + } + + public static ApplicationId toApplicationId(RecordFactory recordFactory, + String appIdStr) { + Iterator<String> it = _split(appIdStr).iterator(); + it.next(); // prefix. TODO: Validate application prefix + return toApplicationId(recordFactory, it); + } + + private static ApplicationId toApplicationId(RecordFactory recordFactory, + Iterator<String> it) { + ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()), + Integer.parseInt(it.next())); + return appId; + } + + private static ApplicationAttemptId toApplicationAttemptId( + Iterator<String> it) throws NumberFormatException { + ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()), + Integer.parseInt(it.next())); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, Integer.parseInt(it.next())); + return appAttemptId; + } + + private static ApplicationId toApplicationId( + Iterator<String> it) throws NumberFormatException { + ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()), + Integer.parseInt(it.next())); + return appId; + } + + public static String toString(TajoContainerId cId) { + return cId == null ? null : cId.toString(); + } + + public static NodeId toNodeId(String nodeIdStr) { + String[] parts = nodeIdStr.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid NodeId [" + nodeIdStr + + "]. Expected host:port"); + } + try { + NodeId nodeId = + NodeId.newInstance(parts[0], Integer.parseInt(parts[1])); + return nodeId; + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid port: " + parts[1], e); + } + } + + public static TajoContainerId toTajoContainerId(String containerIdStr) { + Iterator<String> it = _split(containerIdStr).iterator(); + if (!it.next().equals(CONTAINER_PREFIX)) { + throw new IllegalArgumentException("Invalid TajoContainerId prefix: " + + containerIdStr); + } + try { + ApplicationAttemptId appAttemptID = toApplicationAttemptId(it); + TajoContainerId containerId = + TajoContainerId.newInstance(appAttemptID, Integer.parseInt(it.next())); + return containerId; + } catch (NumberFormatException n) { + throw new IllegalArgumentException("Invalid TajoContainerId: " + + containerIdStr, n); + } + } + + public static ApplicationAttemptId toApplicationAttemptId( + String applicationAttmeptIdStr) { + Iterator<String> it = _split(applicationAttmeptIdStr).iterator(); + if (!it.next().equals(APPLICATION_ATTEMPT_PREFIX)) { + throw new IllegalArgumentException("Invalid AppAttemptId prefix: " + + applicationAttmeptIdStr); + } + try { + return toApplicationAttemptId(it); + } catch (NumberFormatException n) { + throw new IllegalArgumentException("Invalid AppAttemptId: " + + applicationAttmeptIdStr, n); + } + } + + public static ApplicationId toApplicationId( + String appIdStr) { + Iterator<String> it = _split(appIdStr).iterator(); + if (!it.next().equals(APPLICATION_PREFIX)) { + throw new IllegalArgumentException("Invalid ApplicationId prefix: " + + appIdStr + ". The valid ApplicationId should start with prefix " + + APPLICATION_PREFIX); + } + try { + return toApplicationId(it); + } catch (NumberFormatException n) { + throw new IllegalArgumentException("Invalid AppAttemptId: " + + appIdStr, n); + } + } + + /** + * Convert a protobuf token into a rpc token and set its service. Supposed + * to be used for tokens other than RMDelegationToken. For + * RMDelegationToken, use + * {@link #convertFromYarn(org.apache.hadoop.yarn.api.records.Token, + * org.apache.hadoop.io.Text)} instead. + * + * @param protoToken the yarn token + * @param serviceAddr the connect address for the service + * @return rpc token + */ + public static <T extends TokenIdentifier> Token<T> convertFromYarn( + org.apache.hadoop.yarn.api.records.Token protoToken, + InetSocketAddress serviceAddr) { + Token<T> token = new Token<T>(protoToken.getIdentifier().array(), + protoToken.getPassword().array(), + new Text(protoToken.getKind()), + new Text(protoToken.getService())); + if (serviceAddr != null) { + SecurityUtil.setTokenService(token, serviceAddr); + } + return token; + } + + /** + * Convert a protobuf token into a rpc token and set its service. + * + * @param protoToken the yarn token + * @param service the service for the token + */ + public static <T extends TokenIdentifier> Token<T> convertFromYarn( + org.apache.hadoop.yarn.api.records.Token protoToken, + Text service) { + Token<T> token = new Token<T>(protoToken.getIdentifier().array(), + protoToken.getPassword().array(), + new Text(protoToken.getKind()), + new Text(protoToken.getService())); + + if (service != null) { + token.setService(service); + } + return token; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactory.java new file mode 100644 index 0000000..2fd8697 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactory.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.container; + +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * This class is borrowed from the following source code : + * ${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/factories/RecordFactory.java + * + */ +@Unstable +public interface TajoRecordFactory { + public <T> T newRecordInstance(Class<T> clazz); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryPBImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryPBImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryPBImpl.java new file mode 100644 index 0000000..c352a28 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryPBImpl.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.tajo.master.container; + + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.tajo.master.container.TajoRecordFactory; + +/** + * This class is borrowed from the following source code : + * ${hadoop-yarn-common}/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RecordFactoryPBImpl.java + */ +@Private +public class TajoRecordFactoryPBImpl implements TajoRecordFactory { + + private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb"; + private static final String PB_IMPL_CLASS_SUFFIX = "PBImpl"; + + private static final TajoRecordFactoryPBImpl self = new TajoRecordFactoryPBImpl(); + private Configuration localConf = new Configuration(); + private ConcurrentMap<Class<?>, Constructor<?>> cache = new ConcurrentHashMap<Class<?>, Constructor<?>>(); + + private TajoRecordFactoryPBImpl() { + } + + public static TajoRecordFactory get() { + return self; + } + + @SuppressWarnings("unchecked") + @Override + public <T> T newRecordInstance(Class<T> clazz) { + + Constructor<?> constructor = cache.get(clazz); + if (constructor == null) { + Class<?> pbClazz = null; + try { + pbClazz = localConf.getClassByName(getPBImplClassName(clazz)); + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Failed to load class: [" + + getPBImplClassName(clazz) + "]", e); + } + try { + constructor = pbClazz.getConstructor(); + constructor.setAccessible(true); + cache.putIfAbsent(clazz, constructor); + } catch (NoSuchMethodException e) { + throw new YarnRuntimeException("Could not find 0 argument constructor", e); + } + } + try { + Object retObject = constructor.newInstance(); + return (T)retObject; + } catch (InvocationTargetException e) { + throw new YarnRuntimeException(e); + } catch (IllegalAccessException e) { + throw new YarnRuntimeException(e); + } catch (InstantiationException e) { + throw new YarnRuntimeException(e); + } + } + + private String getPBImplClassName(Class<?> clazz) { + String srcPackagePart = getPackageName(clazz); + String srcClassName = getClassName(clazz); + String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX; + String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX; + return destPackagePart + "." + destClassPart; + } + + private String getClassName(Class<?> clazz) { + String fqName = clazz.getName(); + return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length())); + } + + private String getPackageName(Class<?> clazz) { + return clazz.getPackage().getName(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryProvider.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryProvider.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryProvider.java new file mode 100644 index 0000000..c260e85 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryProvider.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.container; + + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +/** + * This class is borrowed from the following source code : + * ${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/factory/providers/RecordFactoryProvider.java + */ +@Unstable +public class TajoRecordFactoryProvider { + private static Configuration defaultConf; + + static { + defaultConf = new Configuration(); + } + + private TajoRecordFactoryProvider() { + } + + public static TajoRecordFactory getRecordFactory(Configuration conf) { + if (conf == null) { + //Assuming the default configuration has the correct factories set. + //Users can specify a particular factory by providing a configuration. + conf = defaultConf; + } + return (TajoRecordFactory) getFactoryClassInstance(TajoRecordFactoryPBImpl.class.getCanonicalName()); + } + + private static Object getFactoryClassInstance(String factoryClassName) { + try { + Class<?> clazz = Class.forName(factoryClassName); + Method method = clazz.getMethod("get", null); + method.setAccessible(true); + return method.invoke(null, null); + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException(e); + } catch (NoSuchMethodException e) { + throw new YarnRuntimeException(e); + } catch (InvocationTargetException e) { + throw new YarnRuntimeException(e); + } catch (IllegalAccessException e) { + throw new YarnRuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecords.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecords.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecords.java new file mode 100644 index 0000000..e85edf9 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecords.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.container; + + +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * This class is borrowed from the following source code : + * ${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/util/Records.java + * + * Convenient API record utils + */ +@Unstable +public class TajoRecords { + // The default record factory + private static final TajoRecordFactory factory = + TajoRecordFactoryProvider.getRecordFactory(null); + + public static <T> T newRecord(Class<T> cls) { + return factory.newRecordInstance(cls); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/impl/pb/TajoContainerIdPBImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/impl/pb/TajoContainerIdPBImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/container/impl/pb/TajoContainerIdPBImpl.java new file mode 100644 index 0000000..9d31050 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/container/impl/pb/TajoContainerIdPBImpl.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.container.impl.pb; + + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; + +import com.google.common.base.Preconditions; +import org.apache.tajo.ipc.ContainerProtocol; +import org.apache.tajo.master.container.TajoContainerId; + +/** + * This class is borrowed from the following source code : + * ${hadoop-yarn-common}/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java + * + */ +@Private +@Unstable +public class TajoContainerIdPBImpl extends TajoContainerId { + ContainerProtocol.TajoContainerIdProto proto = null; + ContainerProtocol.TajoContainerIdProto.Builder builder = null; + private ApplicationAttemptId applicationAttemptId = null; + + public TajoContainerIdPBImpl() { + builder = ContainerProtocol.TajoContainerIdProto.newBuilder(); + } + + public TajoContainerIdPBImpl(ContainerProtocol.TajoContainerIdProto proto) { + this.proto = proto; + this.applicationAttemptId = convertFromProtoFormat(proto.getAppAttemptId()); + } + + public ContainerProtocol.TajoContainerIdProto getProto() { + return proto; + } + + @Override + public int getId() { + Preconditions.checkNotNull(proto); + return proto.getId(); + } + + @Override + protected void setId(int id) { + Preconditions.checkNotNull(builder); + builder.setId((id)); + } + + + @Override + public ApplicationAttemptId getApplicationAttemptId() { + return this.applicationAttemptId; + } + + @Override + protected void setApplicationAttemptId(ApplicationAttemptId atId) { + if (atId != null) { + Preconditions.checkNotNull(builder); + builder.setAppAttemptId(convertToProtoFormat(atId)); + } + this.applicationAttemptId = atId; + } + + private ApplicationAttemptIdPBImpl convertFromProtoFormat( + ApplicationAttemptIdProto p) { + return new ApplicationAttemptIdPBImpl(p); + } + + private ApplicationAttemptIdProto convertToProtoFormat( + ApplicationAttemptId t) { + return ((ApplicationAttemptIdPBImpl)t).getProto(); + } + + @Override + protected void build() { + proto = builder.build(); + builder = null; + } +} + http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java index 92e6695..cab2202 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java @@ -18,18 +18,19 @@ package org.apache.tajo.master.event; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.master.container.TajoContainerId; /** * This event is sent to a running TaskAttempt on a worker. */ public class LocalTaskEvent extends AbstractEvent<LocalTaskEventType> { private final QueryUnitAttemptId taskAttemptId; - private final ContainerId containerId; + private final TajoContainerId containerId; - public LocalTaskEvent(QueryUnitAttemptId taskAttemptId, ContainerId containerId, LocalTaskEventType eventType) { + public LocalTaskEvent(QueryUnitAttemptId taskAttemptId, TajoContainerId containerId, + LocalTaskEventType eventType) { super(eventType); this.taskAttemptId = taskAttemptId; this.containerId = containerId; @@ -39,7 +40,7 @@ public class LocalTaskEvent extends AbstractEvent<LocalTaskEventType> { return taskAttemptId; } - public ContainerId getContainerId() { + public TajoContainerId getContainerId() { return containerId; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java index a2acc7e..6e0d9fd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java @@ -19,10 +19,10 @@ package org.apache.tajo.master.event; import com.google.protobuf.RpcCallback; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto; import org.apache.tajo.master.querymaster.QueryUnitAttempt; +import org.apache.tajo.master.container.TajoContainerId; public class QueryUnitAttemptScheduleEvent extends TaskSchedulerEvent { private final QueryUnitAttemptScheduleContext context; @@ -44,7 +44,7 @@ public class QueryUnitAttemptScheduleEvent extends TaskSchedulerEvent { } public static class QueryUnitAttemptScheduleContext { - private ContainerId containerId; + private TajoContainerId containerId; private String host; private RpcCallback<QueryUnitRequestProto> callback; @@ -52,7 +52,7 @@ public class QueryUnitAttemptScheduleEvent extends TaskSchedulerEvent { } - public QueryUnitAttemptScheduleContext(ContainerId containerId, + public QueryUnitAttemptScheduleContext(TajoContainerId containerId, String host, RpcCallback<QueryUnitRequestProto> callback) { this.containerId = containerId; @@ -60,11 +60,11 @@ public class QueryUnitAttemptScheduleEvent extends TaskSchedulerEvent { this.callback = callback; } - public ContainerId getContainerId() { + public TajoContainerId getContainerId() { return containerId; } - public void setContainerId(ContainerId containerId) { + public void setContainerId(TajoContainerId containerId) { this.containerId = containerId; } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java index a8f4800..e617d53 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java @@ -18,21 +18,21 @@ package org.apache.tajo.master.event; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.master.container.TajoContainer; import java.util.List; public class SubQueryContainerAllocationEvent extends SubQueryEvent { - private List<Container> allocatedContainer; + private List<TajoContainer> allocatedContainer; public SubQueryContainerAllocationEvent(final ExecutionBlockId id, - List<Container> allocatedContainer) { + List<TajoContainer> allocatedContainer) { super(id, SubQueryEventType.SQ_CONTAINER_ALLOCATED); this.allocatedContainer = allocatedContainer; } - public List<Container> getAllocatedContainer() { + public List<TajoContainer> getAllocatedContainer() { return this.allocatedContainer; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java index e0928c5..3b9edcb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java @@ -18,22 +18,22 @@ package org.apache.tajo.master.event; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.container.TajoContainerId; public class TaskAttemptAssignedEvent extends TaskAttemptEvent { - private final ContainerId cId; + private final TajoContainerId cId; private final WorkerConnectionInfo workerConnectionInfo; - public TaskAttemptAssignedEvent(QueryUnitAttemptId id, ContainerId cId, + public TaskAttemptAssignedEvent(QueryUnitAttemptId id, TajoContainerId cId, WorkerConnectionInfo connectionInfo) { super(id, TaskAttemptEventType.TA_ASSIGNED); this.cId = cId; this.workerConnectionInfo = connectionInfo; } - public ContainerId getContainerId() { + public TajoContainerId getContainerId() { return cId; } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java index 2197c33..9e8e3dd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java @@ -19,11 +19,11 @@ package org.apache.tajo.master.event; import com.google.protobuf.RpcCallback; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto; import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType; +import org.apache.tajo.master.container.TajoContainerId; public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> { @@ -32,13 +32,13 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> { } private final int workerId; - private final ContainerId containerId; + private final TajoContainerId containerId; private final ExecutionBlockId executionBlockId; private final RpcCallback<QueryUnitRequestProto> callback; public TaskRequestEvent(int workerId, - ContainerId containerId, + TajoContainerId containerId, ExecutionBlockId executionBlockId, RpcCallback<QueryUnitRequestProto> callback) { super(TaskRequestEventType.TASK_REQ); @@ -52,7 +52,7 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> { return this.workerId; } - public ContainerId getContainerId() { + public TajoContainerId getContainerId() { return this.containerId; } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index d949ca4..e361c7f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; @@ -74,7 +75,7 @@ public class QueryInProgress extends CompositeService { private QueryMasterProtocolService queryMasterRpcClient; - private YarnProtos.ContainerIdProto qmContainerId; + private ContainerProtocol.TajoContainerIdProto qmContainerId; public QueryInProgress( TajoMaster.MasterContext masterContext, http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java index f953995..f4bd8a3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java @@ -25,7 +25,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tajo.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; @@ -33,6 +32,7 @@ import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.LazyTaskScheduler; import org.apache.tajo.master.event.*; +import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.session.Session; import org.apache.tajo.rpc.AsyncRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; @@ -130,7 +130,7 @@ public class QueryMasterManagerService extends CompositeService if(queryMasterTask == null || queryMasterTask.isStopped()) { done.run(LazyTaskScheduler.stopTaskRunnerReq); } else { - ContainerId cid = + TajoContainerId cid = queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId()); LOG.debug("getTask:" + cid + ", ebId:" + ebId); queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(request.getWorkerId(), cid, ebId, done)); http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java index db6f130..d88173f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java @@ -21,7 +21,6 @@ package org.apache.tajo.master.querymaster; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.*; import org.apache.tajo.QueryUnitAttemptId; @@ -35,6 +34,7 @@ import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttem import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; import org.apache.tajo.master.querymaster.QueryUnit.PullHost; +import org.apache.tajo.master.container.TajoContainerId; import java.util.ArrayList; import java.util.EnumSet; @@ -55,7 +55,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { private final QueryUnit queryUnit; final EventHandler eventHandler; - private ContainerId containerId; + private TajoContainerId containerId; private WorkerConnectionInfo workerConnectionInfo; private int expire; @@ -214,7 +214,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { return this.workerConnectionInfo; } - public void setContainerId(ContainerId containerId) { + public void setContainerId(TajoContainerId containerId) { this.containerId = containerId; } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 476da04..39bb7ed 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -24,7 +24,6 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -58,6 +57,8 @@ import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext; import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; +import org.apache.tajo.master.container.TajoContainer; +import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.logical.*; import org.apache.tajo.storage.StorageManager; @@ -105,7 +106,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> { private long finishTime; volatile Map<QueryUnitId, QueryUnit> tasks = new ConcurrentHashMap<QueryUnitId, QueryUnit>(); - volatile Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>(); + volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId, + TajoContainer>(); private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); @@ -663,13 +665,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> { eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values())); } - public void releaseContainer(ContainerId containerId) { - // try to kill the container. - ArrayList<Container> list = new ArrayList<Container>(); - list.add(containers.get(containerId)); - eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), list)); - } - /** * It computes all stats and sets the intermediate result. */ @@ -1129,8 +1124,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> { try { SubQueryContainerAllocationEvent allocationEvent = (SubQueryContainerAllocationEvent) event; - for (Container container : allocationEvent.getAllocatedContainer()) { - ContainerId cId = container.getId(); + for (TajoContainer container : allocationEvent.getAllocatedContainer()) { + TajoContainerId cId = container.getId(); if (subQuery.containers.containsKey(cId)) { subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(), "Duplicated containers are allocated: " + cId.toString())); http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java index 5d07ff2..bb8cc12 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java @@ -21,14 +21,13 @@ package org.apache.tajo.master.rm; import com.google.common.collect.Maps; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.tajo.QueryId; +import org.apache.tajo.ipc.ContainerProtocol; import java.util.Collections; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; - /** * It's a worker resource manager context. It contains all context data about TajoWorkerResourceManager. */ @@ -43,7 +42,8 @@ public class TajoRMContext { private final ConcurrentMap<Integer, Worker> inactiveWorkers = Maps.newConcurrentMap(); /** map between queryIds and query master ContainerId */ - private final ConcurrentMap<QueryId, ContainerIdProto> qmContainerMap = Maps.newConcurrentMap(); + private final ConcurrentMap<QueryId, ContainerProtocol.TajoContainerIdProto> qmContainerMap = Maps + .newConcurrentMap(); private final Set<Integer> liveQueryMasterWorkerResources = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>()); @@ -77,7 +77,7 @@ public class TajoRMContext { * * @return The Map for query master containers */ - public ConcurrentMap<QueryId, ContainerIdProto> getQueryMasterContainer() { + public ConcurrentMap<QueryId, ContainerProtocol.TajoContainerIdProto> getQueryMasterContainer() { return qmContainerMap; } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java index 4d6cbd2..3d28d85 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java @@ -19,9 +19,12 @@ package org.apache.tajo.master.rm; import org.apache.hadoop.yarn.api.records.*; +import org.apache.tajo.master.container.TajoContainer; +import org.apache.tajo.master.container.TajoContainerId; -public class TajoWorkerContainer extends Container { - ContainerId id; + +public class TajoWorkerContainer extends TajoContainer { + TajoContainerId id; NodeId nodeId; Worker worker; @@ -34,12 +37,12 @@ public class TajoWorkerContainer extends Container { } @Override - public ContainerId getId() { + public TajoContainerId getId() { return id; } @Override - public void setId(ContainerId id) { + public void setId(TajoContainerId id) { this.id = id; } @@ -94,7 +97,7 @@ public class TajoWorkerContainer extends Container { } @Override - public int compareTo(Container container) { - return 0; //To change body of implemented methods use File | Settings | File Templates. + public int compareTo(TajoContainer container) { + return getId().compareTo(container.getId()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java index 634ad2b..184de71 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java @@ -19,10 +19,11 @@ package org.apache.tajo.master.rm; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.tajo.ipc.ContainerProtocol; +import org.apache.tajo.master.container.TajoContainerId; -public class TajoWorkerContainerId extends ContainerId { +public class TajoWorkerContainerId extends TajoContainerId { ApplicationAttemptId applicationAttemptId; int id; @@ -46,43 +47,43 @@ public class TajoWorkerContainerId extends ContainerId { this.id = id; } - public YarnProtos.ContainerIdProto getProto() { + public ContainerProtocol.TajoContainerIdProto getProto() { YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder() - .setClusterTimestamp(applicationAttemptId.getApplicationId().getClusterTimestamp()) - .setId(applicationAttemptId.getApplicationId().getId()) - .build(); + .setClusterTimestamp(applicationAttemptId.getApplicationId().getClusterTimestamp()) + .setId(applicationAttemptId.getApplicationId().getId()) + .build(); YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder() - .setAttemptId(applicationAttemptId.getAttemptId()) - .setApplicationId(appIdProto) - .build(); + .setAttemptId(applicationAttemptId.getAttemptId()) + .setApplicationId(appIdProto) + .build(); - return YarnProtos.ContainerIdProto.newBuilder() - .setAppAttemptId(attemptIdProto) - .setAppId(appIdProto) - .setId(id) - .build(); + return ContainerProtocol.TajoContainerIdProto.newBuilder() + .setAppAttemptId(attemptIdProto) + .setAppId(appIdProto) + .setId(id) + .build(); } - public static YarnProtos.ContainerIdProto getContainerIdProto(ContainerId containerId) { + public static ContainerProtocol.TajoContainerIdProto getContainerIdProto(TajoContainerId containerId) { if(containerId instanceof TajoWorkerContainerId) { return ((TajoWorkerContainerId)containerId).getProto(); } else { YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder() - .setClusterTimestamp(containerId.getApplicationAttemptId().getApplicationId().getClusterTimestamp()) - .setId(containerId.getApplicationAttemptId().getApplicationId().getId()) - .build(); + .setClusterTimestamp(containerId.getApplicationAttemptId().getApplicationId().getClusterTimestamp()) + .setId(containerId.getApplicationAttemptId().getApplicationId().getId()) + .build(); YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder() - .setAttemptId(containerId.getApplicationAttemptId().getAttemptId()) - .setApplicationId(appIdProto) - .build(); + .setAttemptId(containerId.getApplicationAttemptId().getAttemptId()) + .setApplicationId(appIdProto) + .build(); - return YarnProtos.ContainerIdProto.newBuilder() - .setAppAttemptId(attemptIdProto) - .setAppId(appIdProto) - .setId(containerId.getId()) - .build(); + return ContainerProtocol.TajoContainerIdProto.newBuilder() + .setAppAttemptId(attemptIdProto) + .setAppId(appIdProto) + .setId(containerId.getId()) + .build(); } }
