http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java index 0e3ccad..0cc87fc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.querymaster.QueryInProgress; @@ -48,7 +49,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import static org.apache.tajo.ipc.TajoMasterProtocol.*; @@ -80,7 +80,8 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke private TajoConf systemConf; - private ConcurrentMap<ContainerIdProto, AllocatedWorkerResource> allocatedResourceMap = Maps.newConcurrentMap(); + private ConcurrentMap<ContainerProtocol.TajoContainerIdProto, AllocatedWorkerResource> allocatedResourceMap = Maps + .newConcurrentMap(); /** It receives status messages from workers and their resources. */ private TajoResourceTracker resourceTracker; @@ -194,7 +195,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke private WorkerResourceAllocationRequest createQMResourceRequest(QueryId queryId) { float queryMasterDefaultDiskSlot = masterContext.getConf().getFloatVar( - TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT); + TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT); int queryMasterDefaultMemoryMB = masterContext.getConf().getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB); WorkerResourceAllocationRequest.Builder builder = WorkerResourceAllocationRequest.newBuilder(); @@ -235,7 +236,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke return resource; } - private void registerQueryMaster(QueryId queryId, ContainerIdProto containerId) { + private void registerQueryMaster(QueryId queryId, ContainerProtocol.TajoContainerIdProto containerId) { rmContext.getQueryMasterContainer().putIfAbsent(queryId, containerId); } @@ -256,9 +257,9 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke WorkerResourceAllocationRequest request; RpcCallback<WorkerResourceAllocationResponse> callBack; WorkerResourceRequest( - QueryId queryId, - boolean queryMasterRequest, WorkerResourceAllocationRequest request, - RpcCallback<WorkerResourceAllocationResponse> callBack) { + QueryId queryId, + boolean queryMasterRequest, WorkerResourceAllocationRequest request, + RpcCallback<WorkerResourceAllocationResponse> callBack) { this.queryId = queryId; this.queryMasterRequest = queryMasterRequest; this.request = request; @@ -282,14 +283,14 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke if (LOG.isDebugEnabled()) { LOG.debug("allocateWorkerResources:" + - (new QueryId(resourceRequest.request.getQueryId())) + - ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() + - "~" + resourceRequest.request.getMaxMemoryMBPerContainer() + - ", requiredContainers:" + resourceRequest.request.getNumContainers() + - ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() + - "~" + resourceRequest.request.getMaxDiskSlotPerContainer() + - ", queryMasterRequest=" + resourceRequest.queryMasterRequest + - ", liveWorkers=" + rmContext.getWorkers().size()); + (new QueryId(resourceRequest.request.getQueryId())) + + ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() + + "~" + resourceRequest.request.getMaxMemoryMBPerContainer() + + ", requiredContainers:" + resourceRequest.request.getNumContainers() + + ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() + + "~" + resourceRequest.request.getMaxDiskSlotPerContainer() + + ", queryMasterRequest=" + resourceRequest.queryMasterRequest + + ", liveWorkers=" + rmContext.getWorkers().size()); } // TajoWorkerResourceManager can't return allocated disk slots occasionally. @@ -300,25 +301,25 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke if(allocatedWorkerResources.size() > 0) { List<WorkerAllocatedResource> allocatedResources = - new ArrayList<WorkerAllocatedResource>(); + new ArrayList<WorkerAllocatedResource>(); for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) { NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getConnectionInfo().getHost(), - allocatedResource.worker.getConnectionInfo().getPeerRpcPort()); + allocatedResource.worker.getConnectionInfo().getPeerRpcPort()); TajoWorkerContainerId containerId = new TajoWorkerContainerId(); containerId.setApplicationAttemptId( - ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId)); + ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId)); containerId.setId(containerIdSeq.incrementAndGet()); - ContainerIdProto containerIdProto = containerId.getProto(); + ContainerProtocol.TajoContainerIdProto containerIdProto = containerId.getProto(); allocatedResources.add(WorkerAllocatedResource.newBuilder() - .setContainerId(containerIdProto) - .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto()) - .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB) - .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots) - .build()); + .setContainerId(containerIdProto) + .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto()) + .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB) + .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots) + .build()); allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource); @@ -358,7 +359,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke int allocatedResources = 0; TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority - = resourceRequest.request.getResourceRequestPriority(); + = resourceRequest.request.getResourceRequestPriority(); if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) { synchronized(rmContext) { @@ -369,7 +370,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer(); int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer(); float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(), - resourceRequest.request.getMinDiskSlotPerContainer()); + resourceRequest.request.getMinDiskSlotPerContainer()); int liveWorkerSize = randomWorkers.size(); Set<Integer> insufficientWorkers = new HashSet<Integer>(); @@ -418,7 +419,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke } workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots, - allocatedWorkerResource.allocatedMemoryMB); + allocatedWorkerResource.allocatedMemoryMB); selectedWorkers.add(allocatedWorkerResource); @@ -438,7 +439,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer(); float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer(); int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(), - resourceRequest.request.getMinMemoryMBPerContainer()); + resourceRequest.request.getMinMemoryMBPerContainer()); int liveWorkerSize = randomWorkers.size(); Set<Integer> insufficientWorkers = new HashSet<Integer>(); @@ -487,7 +488,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB(); } workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots, - allocatedWorkerResource.allocatedMemoryMB); + allocatedWorkerResource.allocatedMemoryMB); selectedWorkers.add(allocatedWorkerResource); @@ -508,7 +509,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke * @param containerId ContainerIdProto to be released */ @Override - public void releaseWorkerResource(ContainerIdProto containerId) { + public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto containerId) { AllocatedWorkerResource allocated = allocatedResourceMap.get(containerId); if(allocated != null) { LOG.info("Release Resource: " + allocated.allocatedDiskSlots + "," + allocated.allocatedMemoryMB); @@ -530,7 +531,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke LOG.warn("No QueryMaster resource info for " + queryId); return; } else { - ContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId); + ContainerProtocol.TajoContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId); releaseWorkerResource(containerId); rmContext.getStoppedQueryIds().add(queryId); LOG.info(String.format("Released QueryMaster (%s) resource." , queryId.toString()));
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java index 8e8ac51..9c2b71b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java @@ -20,8 +20,8 @@ package org.apache.tajo.master.rm; import com.google.protobuf.RpcCallback; import org.apache.hadoop.service.Service; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.tajo.QueryId; +import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.querymaster.QueryInProgress; @@ -63,7 +63,7 @@ public interface WorkerResourceManager extends Service { * * @param containerId ContainerIdProto to be released */ - public void releaseWorkerResource(ContainerIdProto containerId); + public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto containerId); public String getSeedQueryId() throws IOException; http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java index ca71c53..68c57f2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java @@ -20,9 +20,9 @@ package org.apache.tajo.worker; import com.google.common.collect.Maps; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tajo.master.ContainerProxy; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.container.TajoContainerId; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -41,29 +41,29 @@ public abstract class AbstractResourceAllocator extends CompositeService impleme workerInfoMap.putIfAbsent(connectionInfo.getId(), connectionInfo); } - private Map<ContainerId, ContainerProxy> containers = Maps.newConcurrentMap(); + private Map<TajoContainerId, ContainerProxy> containers = Maps.newConcurrentMap(); public AbstractResourceAllocator() { super(AbstractResourceAllocator.class.getName()); } - public void addContainer(ContainerId cId, ContainerProxy container) { + public void addContainer(TajoContainerId cId, ContainerProxy container) { containers.put(cId, container); } - public void removeContainer(ContainerId cId) { + public void removeContainer(TajoContainerId cId) { containers.remove(cId); } - public boolean containsContainer(ContainerId cId) { + public boolean containsContainer(TajoContainerId cId) { return containers.containsKey(cId); } - public ContainerProxy getContainer(ContainerId cId) { + public ContainerProxy getContainer(TajoContainerId cId) { return containers.get(cId); } - public Map<ContainerId, ContainerProxy> getContainers() { + public Map<TajoContainerId, ContainerProxy> getContainers() { return containers; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java index 8b9219c..b713e70 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java @@ -18,12 +18,12 @@ package org.apache.tajo.worker; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.tajo.ipc.ContainerProtocol; +import org.apache.tajo.master.container.TajoContainerId; public interface ResourceAllocator { public void allocateTaskWorker(); - public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId); + public TajoContainerId makeContainerId(ContainerProtocol.TajoContainerIdProto containerId); public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks, int memoryMBPerTask); } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 2220089..9345885 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -23,28 +23,25 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.*; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.container.TajoContainer; +import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.ContainerAllocationEvent; import org.apache.tajo.master.event.ContainerAllocatorEventType; import org.apache.tajo.master.event.SubQueryContainerAllocationEvent; import org.apache.tajo.master.querymaster.QueryMasterTask; import org.apache.tajo.master.querymaster.SubQuery; import org.apache.tajo.master.querymaster.SubQueryState; -import org.apache.tajo.master.rm.TajoWorkerContainer; -import org.apache.tajo.master.rm.TajoWorkerContainerId; -import org.apache.tajo.master.rm.Worker; -import org.apache.tajo.master.rm.WorkerResource; +import org.apache.tajo.master.rm.*; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; @@ -72,11 +69,11 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) { this.queryTaskContext = queryTaskContext; executorService = Executors.newFixedThreadPool( - queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM)); + queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM)); } @Override - public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerIdProto) { + public TajoContainerId makeContainerId(ContainerProtocol.TajoContainerIdProto containerIdProto) { TajoWorkerContainerId containerId = new TajoWorkerContainerId(); ApplicationAttemptId appAttemptId = new ApplicationAttemptIdPBImpl(containerIdProto.getAppAttemptId()); containerId.setApplicationAttemptId(appAttemptId); @@ -98,7 +95,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask; clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks + - ", Number of Cluster Slots=" + clusterSlots); + ", Number of Cluster Slots=" + clusterSlots); return Math.min(numTasks, clusterSlots); } @@ -121,7 +118,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { executorService.shutdownNow(); - Map<ContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator().getContainers(); + Map<TajoContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator() + .getContainers(); List<ContainerProxy> list = new ArrayList<ContainerProxy>(containers.values()); for(ContainerProxy eachProxy: list) { try { @@ -156,16 +154,17 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { private void launchTaskRunners(LaunchTaskRunnersEvent event) { // Query in standby mode doesn't need launch Worker. // But, Assign ExecutionBlock to assigned tajo worker - for(Container eachContainer: event.getContainers()) { + for(TajoContainer eachContainer: event.getContainers()) { TajoContainerProxy containerProxy = new TajoContainerProxy(queryTaskContext, tajoConf, - eachContainer, event.getQueryContext(), event.getExecutionBlockId(), event.getPlanJson()); + eachContainer, event.getQueryContext(), event.getExecutionBlockId(), event.getPlanJson()); executorService.submit(new LaunchRunner(eachContainer.getId(), containerProxy)); } } - public void stopExecutionBlock(final ExecutionBlockId executionBlockId, Collection<Container> containers) { + public void stopExecutionBlock(final ExecutionBlockId executionBlockId, + Collection<TajoContainer> containers) { Set<NodeId> workers = Sets.newHashSet(); - for (Container container : containers){ + for (TajoContainer container : containers){ workers.add(container.getNodeId()); } @@ -196,8 +195,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { protected static class LaunchRunner implements Runnable { private final ContainerProxy proxy; - private final ContainerId id; - public LaunchRunner(ContainerId id, ContainerProxy proxy) { + private final TajoContainerId id; + public LaunchRunner(TajoContainerId id, ContainerProxy proxy) { this.proxy = proxy; this.id = id; } @@ -210,8 +209,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { } } - private void stopContainers(Collection<Container> containers) { - for (Container container : containers) { + private void stopContainers(Collection<TajoContainer> containers) { + for (TajoContainer container : containers) { final ContainerProxy proxy = queryTaskContext.getResourceAllocator().getContainer(container.getId()); executorService.submit(new StopContainerRunner(container.getId(), proxy)); } @@ -219,8 +218,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { private static class StopContainerRunner implements Runnable { private final ContainerProxy proxy; - private final ContainerId id; - public StopContainerRunner(ContainerId id, ContainerProxy proxy) { + private final TajoContainerId id; + public StopContainerRunner(TajoContainerId id, ContainerProxy proxy) { this.id = id; this.proxy = proxy; } @@ -251,23 +250,23 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { public void run() { LOG.info("Start TajoWorkerAllocationThread"); CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack = - new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>(); + new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>(); //TODO consider task's resource usage pattern int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY); float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK); TajoMasterProtocol.WorkerResourceAllocationRequest request = - TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder() - .setMinMemoryMBPerContainer(requiredMemoryMB) - .setMaxMemoryMBPerContainer(requiredMemoryMB) - .setNumContainers(event.getRequiredNum()) - .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY - : TajoMasterProtocol.ResourceRequestPriority.DISK) - .setMinDiskSlotPerContainer(requiredDiskSlots) - .setMaxDiskSlotPerContainer(requiredDiskSlots) - .setQueryId(event.getExecutionBlockId().getQueryId().getProto()) - .build(); + TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder() + .setMinMemoryMBPerContainer(requiredMemoryMB) + .setMaxMemoryMBPerContainer(requiredMemoryMB) + .setNumContainers(event.getRequiredNum()) + .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY + : TajoMasterProtocol.ResourceRequestPriority.DISK) + .setMinDiskSlotPerContainer(requiredDiskSlots) + .setMaxDiskSlotPerContainer(requiredDiskSlots) + .setQueryId(event.getExecutionBlockId().getQueryId().getProto()) + .build(); RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf()); NettyClientBase tmClient = null; @@ -280,21 +279,21 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { try { tmClient = connPool.getConnection( - queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); } catch (Exception e) { queryTaskContext.getQueryMasterContext().getWorkerContext(). - setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf)); + setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf)); queryTaskContext.getQueryMasterContext().getWorkerContext(). - setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf)); + setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf)); tmClient = connPool.getConnection( - queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); } } else { tmClient = connPool.getConnection( - queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); } TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); @@ -325,17 +324,17 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList(); ExecutionBlockId executionBlockId = event.getExecutionBlockId(); - List<Container> containers = new ArrayList<Container>(); + List<TajoContainer> containers = new ArrayList<TajoContainer>(); for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) { TajoWorkerContainer container = new TajoWorkerContainer(); NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(), - eachAllocatedResource.getConnectionInfo().getPeerRpcPort()); + eachAllocatedResource.getConnectionInfo().getPeerRpcPort()); TajoWorkerContainerId containerId = new TajoWorkerContainerId(); containerId.setApplicationAttemptId( - ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(), - eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId())); + ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(), + eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId())); containerId.setId(eachAllocatedResource.getContainerId().getId()); container.setId(containerId); @@ -347,7 +346,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots()); Worker worker = new Worker(null, workerResource, - new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo())); + new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo())); container.setWorkerResource(worker); addWorkerConnectionInfo(worker.getConnectionInfo()); containers.add(container); @@ -356,8 +355,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getSynchronizedState(); if (!SubQuery.isRunningState(state)) { try { - List<ContainerId> containerIds = new ArrayList<ContainerId>(); - for(Container eachContainer: containers) { + List<TajoContainerId> containerIds = new ArrayList<TajoContainerId>(); + for(TajoContainer eachContainer: containers) { containerIds.add(eachContainer.getId()); } TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds); @@ -378,10 +377,10 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { } if(event.getRequiredNum() > numAllocatedContainers) { ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent( - event.getType(), event.getExecutionBlockId(), event.getPriority(), - event.getResource(), - event.getRequiredNum() - numAllocatedContainers, - event.isLeafQuery(), event.getProgress() + event.getType(), event.getExecutionBlockId(), event.getPriority(), + event.getResource(), + event.getRequiredNum() - numAllocatedContainers, + event.isLeafQuery(), event.getProgress() ); queryTaskContext.getEventHandler().handle(shortRequestEvent); http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 1910575..4e9860b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -24,15 +24,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.QueryUnitRequestImpl; import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; +import org.apache.tajo.master.container.TajoContainerId; +import org.apache.tajo.master.container.impl.pb.TajoContainerIdPBImpl; +import org.apache.tajo.master.container.TajoConverterUtils; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NullCallback; import org.jboss.netty.channel.ConnectTimeoutException; @@ -53,7 +53,7 @@ public class TaskRunner extends AbstractService { private volatile boolean stopped = false; private Path baseDirPath; - private ContainerId containerId; + private TajoContainerId containerId; // for Fetcher private ExecutorService fetchLauncher; @@ -77,7 +77,7 @@ public class TaskRunner extends AbstractService { this.fetchLauncher = Executors.newFixedThreadPool( systemConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM), fetcherFactory); try { - this.containerId = ConverterUtils.toContainerId(containerId); + this.containerId = TajoConverterUtils.toTajoContainerId(containerId); this.executionBlockContext = executionBlockContext; this.history = executionBlockContext.createTaskRunnerHistory(this); this.history.setState(getServiceState()); @@ -91,11 +91,11 @@ public class TaskRunner extends AbstractService { return getId(getContext().getExecutionBlockId(), containerId); } - public ContainerId getContainerId(){ + public TajoContainerId getContainerId(){ return containerId; } - public static String getId(ExecutionBlockId executionBlockId, ContainerId containerId) { + public static String getId(ExecutionBlockId executionBlockId, TajoContainerId containerId) { return executionBlockId + "," + containerId; } @@ -211,7 +211,7 @@ public class TaskRunner extends AbstractService { LOG.info("Request GetTask: " + getId()); GetTaskRequestProto request = GetTaskRequestProto.newBuilder() .setExecutionBlockId(getExecutionBlockId().getProto()) - .setContainerId(((ContainerIdPBImpl) containerId).getProto()) + .setContainerId(((TajoContainerIdPBImpl) containerId).getProto()) .setWorkerId(getContext().getWorkerContext().getConnectionInfo().getId()) .build(); http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java index a8a11c1..364348f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java @@ -21,11 +21,11 @@ package org.apache.tajo.worker; import com.google.common.base.Objects; import com.google.common.collect.Maps; import org.apache.hadoop.service.Service; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.common.ProtoObject; +import org.apache.tajo.master.container.TajoContainerId; +import org.apache.tajo.master.container.TajoConverterUtils; import java.util.Collections; import java.util.Map; @@ -39,13 +39,13 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRunnerHistoryProto; public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> { private Service.STATE state; - private ContainerId containerId; + private TajoContainerId containerId; private long startTime; private long finishTime; private ExecutionBlockId executionBlockId; private Map<QueryUnitAttemptId, TaskHistory> taskHistoryMap = null; - public TaskRunnerHistory(ContainerId containerId, ExecutionBlockId executionBlockId) { + public TaskRunnerHistory(TajoContainerId containerId, ExecutionBlockId executionBlockId) { init(); this.containerId = containerId; this.executionBlockId = executionBlockId; @@ -53,7 +53,7 @@ public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> { public TaskRunnerHistory(TaskRunnerHistoryProto proto) { this.state = Service.STATE.valueOf(proto.getState()); - this.containerId = ConverterUtils.toContainerId(proto.getContainerId()); + this.containerId = TajoConverterUtils.toTajoContainerId(proto.getContainerId()); this.startTime = proto.getStartTime(); this.finishTime = proto.getFinishTime(); this.executionBlockId = new ExecutionBlockId(proto.getExecutionBlockId()); @@ -129,11 +129,11 @@ public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> { this.state = state; } - public ContainerId getContainerId() { + public TajoContainerId getContainerId() { return containerId; } - public void setContainerId(ContainerId containerId) { + public void setContainerId(TajoContainerId containerId) { this.containerId = containerId; } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/ContainerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/ContainerProtocol.proto b/tajo-core/src/main/proto/ContainerProtocol.proto new file mode 100644 index 0000000..df7a450 --- /dev/null +++ b/tajo-core/src/main/proto/ContainerProtocol.proto @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are public and stable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *stable* .proto interface. + */ + +option java_package = "org.apache.tajo.ipc"; +option java_outer_classname = "ContainerProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +package hadoop.yarn; + +import "Security.proto"; +import "yarn_protos.proto"; + +message TajoContainerIdProto { + optional ApplicationIdProto app_id = 1; + optional ApplicationAttemptIdProto app_attempt_id = 2; + optional int32 id = 3; +} + +message TajoContainerProto { + optional TajoContainerIdProto id = 1; + optional NodeIdProto nodeId = 2; + optional string node_http_address = 3; + optional ResourceProto resource = 4; + optional PriorityProto priority = 5; + optional hadoop.common.TokenProto container_token = 6; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/QueryMasterProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto index 06d2a42..494d296 100644 --- a/tajo-core/src/main/proto/QueryMasterProtocol.proto +++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto @@ -27,6 +27,9 @@ import "TajoIdProtos.proto"; import "CatalogProtos.proto"; import "PrimitiveProtos.proto"; import "TajoWorkerProtocol.proto"; +import "ContainerProtocol.proto"; + +package hadoop.yarn; service QueryMasterProtocolService { //from Worker http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/ResourceTrackerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto index b117cac..b2db46a 100644 --- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto +++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto @@ -23,8 +23,11 @@ option java_generic_services = true; option java_generate_equals_and_hash = true; import "TajoMasterProtocol.proto"; +import "ContainerProtocol.proto"; import "tajo_protos.proto"; +package hadoop.yarn; + message NodeHeartbeat { required WorkerConnectionInfoProto connectionInfo = 1; optional ServerStatusProto serverStatus = 2; http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/TajoMasterProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto index 7283543..e5eab4f 100644 --- a/tajo-core/src/main/proto/TajoMasterProtocol.proto +++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto @@ -28,6 +28,9 @@ import "tajo_protos.proto"; import "TajoIdProtos.proto"; import "CatalogProtos.proto"; import "PrimitiveProtos.proto"; +import "ContainerProtocol.proto"; + +package hadoop.yarn; message ServerStatusProto { message System { @@ -119,11 +122,11 @@ message WorkerResourcesRequest { message WorkerResourceReleaseRequest { required ExecutionBlockIdProto executionBlockId = 1; - repeated hadoop.yarn.ContainerIdProto containerIds = 2; + repeated TajoContainerIdProto containerIds = 2; } message WorkerAllocatedResource { - required hadoop.yarn.ContainerIdProto containerId = 1; + required TajoContainerIdProto containerId = 1; required WorkerConnectionInfoProto connectionInfo = 2; required int32 allocatedMemoryMB = 3; http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index e515438..989b0e3 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -28,6 +28,9 @@ import "TajoIdProtos.proto"; import "CatalogProtos.proto"; import "PrimitiveProtos.proto"; import "Plan.proto"; +import "ContainerProtocol.proto"; + +package hadoop.yarn; message SessionProto { required string session_id = 1; @@ -170,7 +173,7 @@ message QueryExecutionRequestProto { message GetTaskRequestProto { required int32 workerId = 1; - required hadoop.yarn.ContainerIdProto containerId = 2; + required TajoContainerIdProto containerId = 2; required ExecutionBlockIdProto executionBlockId = 3; } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java index 0423894..b8fbd67 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.TajoMasterProtocol.*; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.rpc.NullCallback; @@ -150,7 +151,8 @@ public class TestTajoResourceManager { .build(); final CountDownLatch barrier = new CountDownLatch(1); - final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>(); + final List<ContainerProtocol.TajoContainerIdProto> containerIds = new + ArrayList<ContainerProtocol.TajoContainerIdProto>(); RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() { @@ -190,7 +192,7 @@ public class TestTajoResourceManager { containerIds.add(eachResource.getContainerId()); } - for(YarnProtos.ContainerIdProto eachContainerId: containerIds) { + for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) { tajoWorkerResourceManager.releaseWorkerResource(eachContainerId); } @@ -318,7 +320,8 @@ public class TestTajoResourceManager { .build(); final CountDownLatch barrier = new CountDownLatch(1); - final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>(); + final List<ContainerProtocol.TajoContainerIdProto> containerIds = new + ArrayList<ContainerProtocol.TajoContainerIdProto>(); RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() { @@ -356,7 +359,7 @@ public class TestTajoResourceManager { assertEquals(numWorkers * 3, response.getWorkerAllocatedResourceList().size()); - for(YarnProtos.ContainerIdProto eachContainerId: containerIds) { + for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) { tajoWorkerResourceManager.releaseWorkerResource(eachContainerId); } @@ -399,7 +402,8 @@ public class TestTajoResourceManager { .build(); final CountDownLatch barrier = new CountDownLatch(1); - final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>(); + final List<ContainerProtocol.TajoContainerIdProto> containerIds = new + ArrayList<ContainerProtocol.TajoContainerIdProto>(); RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() { @@ -431,7 +435,7 @@ public class TestTajoResourceManager { assertEquals(0, totalUsedDisks, 0); - for(YarnProtos.ContainerIdProto eachContainerId: containerIds) { + for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) { tajoWorkerResourceManager.releaseWorkerResource(eachContainerId); } http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java index 87b4197..220eb6c 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java @@ -567,6 +567,7 @@ public class StorageManager { for (Path p : inputs) { FileSystem fs = p.getFileSystem(conf); + ArrayList<FileStatus> files = Lists.newArrayList(); if (fs.isFile(p)) { files.addAll(Lists.newArrayList(fs.getFileStatus(p))); http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java index a355a94..bec0daf 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java @@ -1,129 +1,138 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3.S3FileSystem; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.s3.InMemoryFileSystemStore; -import org.apache.tajo.storage.s3.SmallBlockS3FileSystem; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.net.URI; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -@RunWith(Parameterized.class) -public class TestFileSystems { - - protected byte[] data = null; - - private static String TEST_PATH = "target/test-data/TestFileSystem"; - private TajoConf conf = null; - private StorageManager sm = null; - private FileSystem fs = null; - Path testDir; - - public TestFileSystems(FileSystem fs) throws IOException { - conf = new TajoConf(); - - if(fs instanceof S3FileSystem){ - conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "10"); - fs.initialize(URI.create(fs.getScheme() + ":///"), conf); - } - this.fs = fs; - sm = StorageManager.getStorageManager(conf); - testDir = getTestDir(this.fs, TEST_PATH); - } - - public Path getTestDir(FileSystem fs, String dir) throws IOException { - Path path = new Path(dir); - if(fs.exists(path)) - fs.delete(path, true); - - fs.mkdirs(path); - - return fs.makeQualified(path); - } - - @Parameterized.Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][] { - {new SmallBlockS3FileSystem(new InMemoryFileSystemStore())}, - }); - } - - @Test - public void testBlockSplit() throws IOException { - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT4); - schema.addColumn("name", Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); - - Tuple[] tuples = new Tuple[4]; - for (int i = 0; i < tuples.length; i++) { - tuples[i] = new VTuple(3); - tuples[i] - .put(new Datum[] { DatumFactory.createInt4(i), - DatumFactory.createInt4(i + 32), - DatumFactory.createText("name" + i) }); - } - - Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", - "table.csv"); - fs.mkdirs(path.getParent()); - - Appender appender = sm.getAppender(meta, schema, path); - appender.init(); - for (Tuple t : tuples) { - appender.addTuple(t); - } - appender.close(); - FileStatus fileStatus = fs.getFileStatus(path); - - List<FileFragment> splits = sm.getSplits("table", meta, schema, path); - int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize()); - assertEquals(splitSize, splits.size()); - - for (FileFragment fragment : splits) { - assertTrue(fragment.getEndKey() <= fileStatus.getBlockSize()); - } - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.??See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.??The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.??You may obtain a copy of the License at + * + *?????http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class TestFileSystems { + + private static String TEST_PATH = "target/test-data/TestFileSystem"; + private Configuration conf; + private StorageManager sm; + private FileSystem fs; + private Path testDir; + + public TestFileSystems(FileSystem fs) throws IOException { + this.fs = fs; + this.conf = fs.getConf(); + this.testDir = getTestDir(this.fs, TEST_PATH); + this.sm = StorageManager.getStorageManager(new TajoConf(this.conf)); + } + + public Path getTestDir(FileSystem fs, String dir) throws IOException { + Path path = new Path(dir); + if (fs.exists(path)) + fs.delete(path, true); + + fs.mkdirs(path); + + return fs.makeQualified(path); + } + + @Parameterized.Parameters + public static Collection<Object[]> generateParameters() throws IOException { + return Arrays.asList(new Object[][]{ + {FileSystem.getLocal(new TajoConf())}, + }); + } + + @Before + public void setup() throws IOException { + if (!(fs instanceof LocalFileSystem)) { + conf.set("fs.local.block.size", "10"); + fs.initialize(URI.create(fs.getScheme() + ":///"), conf); + fs.setConf(conf); + } + } + + @After + public void tearDown() throws IOException { + if (!(fs instanceof LocalFileSystem)) { + fs.setConf(new TajoConf()); + } + } + + @Test + public void testBlockSplit() throws IOException { + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT4); + schema.addColumn("name", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); + + Tuple[] tuples = new Tuple[4]; + for (int i = 0; i < tuples.length; i++) { + tuples[i] = new VTuple(3); + tuples[i] + .put(new Datum[]{DatumFactory.createInt4(i), + DatumFactory.createInt4(i + 32), + DatumFactory.createText("name" + i)}); + } + + Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", + "table.csv"); + fs.mkdirs(path.getParent()); + + Appender appender = sm.getAppender(meta, schema, path); + appender.init(); + for (Tuple t : tuples) { + appender.addTuple(t); + } + appender.close(); + FileStatus fileStatus = fs.getFileStatus(path); + + List<FileFragment> splits = sm.getSplits("table", meta, schema, path); + int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize()); + assertEquals(splitSize, splits.size()); + + for (FileFragment fragment : splits) { + assertTrue(fragment.getEndKey() <= fileStatus.getBlockSize()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java deleted file mode 100644 index 7b09937..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.s3; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.s3.Block; -import org.apache.hadoop.io.IOUtils; - -import java.io.*; - -/** - * Holds file metadata including type (regular file, or directory), - * and the list of blocks that are pointers to the data. - */ [email protected] [email protected] -public class INode { - - enum FileType { - DIRECTORY, FILE - } - - public static final FileType[] FILE_TYPES = { - FileType.DIRECTORY, - FileType.FILE - }; - - public static final INode DIRECTORY_INODE = new INode(FileType.DIRECTORY, null); - - private FileType fileType; - private Block[] blocks; - - public INode(FileType fileType, Block[] blocks) { - this.fileType = fileType; - if (isDirectory() && blocks != null) { - throw new IllegalArgumentException("A directory cannot contain blocks."); - } - this.blocks = blocks; - } - - public Block[] getBlocks() { - return blocks; - } - - public FileType getFileType() { - return fileType; - } - - public boolean isDirectory() { - return fileType == FileType.DIRECTORY; - } - - public boolean isFile() { - return fileType == FileType.FILE; - } - - public long getSerializedLength() { - return 1L + (blocks == null ? 0 : 4 + blocks.length * 16); - } - - - public InputStream serialize() throws IOException { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(bytes); - try { - out.writeByte(fileType.ordinal()); - if (isFile()) { - out.writeInt(blocks.length); - for (int i = 0; i < blocks.length; i++) { - out.writeLong(blocks[i].getId()); - out.writeLong(blocks[i].getLength()); - } - } - out.close(); - out = null; - } finally { - IOUtils.closeStream(out); - } - return new ByteArrayInputStream(bytes.toByteArray()); - } - - public static INode deserialize(InputStream in) throws IOException { - if (in == null) { - return null; - } - DataInputStream dataIn = new DataInputStream(in); - FileType fileType = INode.FILE_TYPES[dataIn.readByte()]; - switch (fileType) { - case DIRECTORY: - in.close(); - return INode.DIRECTORY_INODE; - case FILE: - int numBlocks = dataIn.readInt(); - Block[] blocks = new Block[numBlocks]; - for (int i = 0; i < numBlocks; i++) { - long id = dataIn.readLong(); - long length = dataIn.readLong(); - blocks[i] = new Block(id, length); - } - in.close(); - return new INode(fileType, blocks); - default: - throw new IllegalArgumentException("Cannot deserialize inode."); - } - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java deleted file mode 100644 index 40decc2..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.s3; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3.Block; -import org.apache.hadoop.fs.s3.FileSystemStore; -import org.apache.hadoop.fs.s3.INode; -import org.apache.tajo.common.exception.NotImplementedException; - -import java.io.*; -import java.net.URI; -import java.util.*; - -/** - * A stub implementation of {@link FileSystemStore} for testing - * {@link S3FileSystem} without actually connecting to S3. - */ -public class InMemoryFileSystemStore implements FileSystemStore { - - private Configuration conf; - private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>(); - private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>(); - - @Override - public void initialize(URI uri, Configuration conf) { - this.conf = conf; - } - - @Override - public String getVersion() throws IOException { - return "0"; - } - - @Override - public void deleteINode(Path path) throws IOException { - inodes.remove(normalize(path)); - } - - @Override - public void deleteBlock(Block block) throws IOException { - blocks.remove(block.getId()); - } - - @Override - public boolean inodeExists(Path path) throws IOException { - return inodes.containsKey(normalize(path)); - } - - @Override - public boolean blockExists(long blockId) throws IOException { - return blocks.containsKey(blockId); - } - - @Override - public INode retrieveINode(Path path) throws IOException { - return inodes.get(normalize(path)); - } - - @Override - public File retrieveBlock(Block block, long byteRangeStart) throws IOException { - byte[] data = blocks.get(block.getId()); - File file = createTempFile(); - BufferedOutputStream out = null; - try { - out = new BufferedOutputStream(new FileOutputStream(file)); - out.write(data, (int) byteRangeStart, data.length - (int) byteRangeStart); - } finally { - if (out != null) { - out.close(); - } - } - return file; - } - - private File createTempFile() throws IOException { - File dir = new File(conf.get("fs.s3.buffer.dir")); - if (!dir.exists() && !dir.mkdirs()) { - throw new IOException("Cannot create S3 buffer directory: " + dir); - } - File result = File.createTempFile("test-", ".tmp", dir); - result.deleteOnExit(); - return result; - } - - @Override - public Set<Path> listSubPaths(Path path) throws IOException { - Path normalizedPath = normalize(path); - // This is inefficient but more than adequate for testing purposes. - Set<Path> subPaths = new LinkedHashSet<Path>(); - for (Path p : inodes.tailMap(normalizedPath).keySet()) { - if (normalizedPath.equals(p.getParent())) { - subPaths.add(p); - } - } - return subPaths; - } - - @Override - public Set<Path> listDeepSubPaths(Path path) throws IOException { - Path normalizedPath = normalize(path); - String pathString = normalizedPath.toUri().getPath(); - if (!pathString.endsWith("/")) { - pathString += "/"; - } - // This is inefficient but more than adequate for testing purposes. - Set<Path> subPaths = new LinkedHashSet<Path>(); - for (Path p : inodes.tailMap(normalizedPath).keySet()) { - if (p.toUri().getPath().startsWith(pathString)) { - subPaths.add(p); - } - } - return subPaths; - } - - @Override - public void storeINode(Path path, INode inode) throws IOException { - inodes.put(normalize(path), inode); - } - - @Override - public void storeBlock(Block block, File file) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte[] buf = new byte[8192]; - int numRead; - BufferedInputStream in = null; - try { - in = new BufferedInputStream(new FileInputStream(file)); - while ((numRead = in.read(buf)) >= 0) { - out.write(buf, 0, numRead); - } - } finally { - if (in != null) { - in.close(); - } - } - blocks.put(block.getId(), out.toByteArray()); - } - - private Path normalize(Path path) { - if (!path.isAbsolute()) { - throw new IllegalArgumentException("Path must be absolute: " + path); - } - return new Path(path.toUri().getPath()); - } - - @Override - public void purge() throws IOException { - inodes.clear(); - blocks.clear(); - } - - @Override - public void dump() throws IOException { - throw new NotImplementedException(); - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java deleted file mode 100644 index d4034b9..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.s3; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3.Block; -import org.apache.hadoop.fs.s3.FileSystemStore; -import org.apache.hadoop.fs.s3.INode; -import org.apache.hadoop.util.Progressable; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - [email protected] [email protected] -class S3OutputStream extends OutputStream { - - private Configuration conf; - - private int bufferSize; - - private FileSystemStore store; - - private Path path; - - private long blockSize; - - private File backupFile; - - private OutputStream backupStream; - - private Random r = new Random(); - - private boolean closed; - - private int pos = 0; - - private long filePos = 0; - - private int bytesWrittenToBlock = 0; - - private byte[] outBuf; - - private List<Block> blocks = new ArrayList<Block>(); - - private Block nextBlock; - - private static final Log LOG = - LogFactory.getLog(S3OutputStream.class.getName()); - - - public S3OutputStream(Configuration conf, FileSystemStore store, - Path path, long blockSize, Progressable progress, - int buffersize) throws IOException { - - this.conf = conf; - this.store = store; - this.path = path; - this.blockSize = blockSize; - this.backupFile = newBackupFile(); - this.backupStream = new FileOutputStream(backupFile); - this.bufferSize = buffersize; - this.outBuf = new byte[bufferSize]; - - } - - private File newBackupFile() throws IOException { - File dir = new File(conf.get("fs.s3.buffer.dir")); - if (!dir.exists() && !dir.mkdirs()) { - throw new IOException("Cannot create S3 buffer directory: " + dir); - } - File result = File.createTempFile("output-", ".tmp", dir); - result.deleteOnExit(); - return result; - } - - public long getPos() throws IOException { - return filePos; - } - - @Override - public synchronized void write(int b) throws IOException { - if (closed) { - throw new IOException("Stream closed"); - } - - if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) { - flush(); - } - outBuf[pos++] = (byte) b; - filePos++; - } - - @Override - public synchronized void write(byte b[], int off, int len) throws IOException { - if (closed) { - throw new IOException("Stream closed"); - } - while (len > 0) { - int remaining = bufferSize - pos; - int toWrite = Math.min(remaining, len); - System.arraycopy(b, off, outBuf, pos, toWrite); - pos += toWrite; - off += toWrite; - len -= toWrite; - filePos += toWrite; - - if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) { - flush(); - } - } - } - - @Override - public synchronized void flush() throws IOException { - if (closed) { - throw new IOException("Stream closed"); - } - - if (bytesWrittenToBlock + pos >= blockSize) { - flushData((int) blockSize - bytesWrittenToBlock); - } - if (bytesWrittenToBlock == blockSize) { - endBlock(); - } - flushData(pos); - } - - private synchronized void flushData(int maxPos) throws IOException { - int workingPos = Math.min(pos, maxPos); - - if (workingPos > 0) { - // - // To the local block backup, write just the bytes - // - backupStream.write(outBuf, 0, workingPos); - - // - // Track position - // - bytesWrittenToBlock += workingPos; - System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos); - pos -= workingPos; - } - } - - private synchronized void endBlock() throws IOException { - // - // Done with local copy - // - backupStream.close(); - - // - // Send it to S3 - // - // TODO: Use passed in Progressable to report progress. - nextBlockOutputStream(); - store.storeBlock(nextBlock, backupFile); - Block[] arr = new Block[blocks.size()]; - arr = blocks.toArray(arr); - store.storeINode(path, new INode(INode.FILE_TYPES[1], arr)); - - // - // Delete local backup, start new one - // - boolean b = backupFile.delete(); - if (!b) { - LOG.warn("Ignoring failed delete"); - } - backupFile = newBackupFile(); - backupStream = new FileOutputStream(backupFile); - bytesWrittenToBlock = 0; - } - - private synchronized void nextBlockOutputStream() throws IOException { - long blockId = r.nextLong(); - while (store.blockExists(blockId)) { - blockId = r.nextLong(); - } - nextBlock = new Block(blockId, bytesWrittenToBlock); - blocks.add(nextBlock); - bytesWrittenToBlock = 0; - } - - - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - - flush(); - if (filePos == 0 || bytesWrittenToBlock != 0) { - endBlock(); - } - - backupStream.close(); - boolean b = backupFile.delete(); - if (!b) { - LOG.warn("Ignoring failed delete"); - } - - super.close(); - - closed = true; - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java deleted file mode 100644 index fc1c908..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java +++ /dev/null @@ -1,314 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.s3; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.s3.Block; -import org.apache.hadoop.fs.s3.FileSystemStore; -import org.apache.hadoop.fs.s3.INode; -import org.apache.hadoop.fs.s3.S3FileSystem; -import org.apache.hadoop.util.Progressable; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - -public class SmallBlockS3FileSystem extends S3FileSystem { - - private URI uri; - - private FileSystemStore store; - - private Path workingDir; - - static class Holder { - private static InMemoryFileSystemStore s; - - public synchronized static FileSystemStore get() { - if(s != null) { - return s; - } - s = new InMemoryFileSystemStore(); - return s; - } - - public synchronized static void set(InMemoryFileSystemStore inMemoryFileSystemStore) { - s = inMemoryFileSystemStore; - } - } - - public SmallBlockS3FileSystem() { - } - - - public SmallBlockS3FileSystem( - InMemoryFileSystemStore inMemoryFileSystemStore) { - Holder.set(inMemoryFileSystemStore); - this.store = inMemoryFileSystemStore; - } - - @Override - public URI getUri() { - return uri; - } - @Override - public long getDefaultBlockSize() { - return 10; - } - - @Override - public void initialize(URI uri, Configuration conf) throws IOException { - if (store == null) { - store = Holder.get(); - } - store.initialize(uri, conf); - setConf(conf); - this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); - this.workingDir = - new Path("/user", System.getProperty("user.name")).makeQualified(this); - } - @Override - public boolean isFile(Path path) throws IOException { - INode inode = store.retrieveINode(makeAbsolute(path)); - if (inode == null) { - return false; - } - return inode.isFile(); - } - - private INode checkFile(Path path) throws IOException { - INode inode = store.retrieveINode(makeAbsolute(path)); - if (inode == null) { - throw new IOException("No such file."); - } - if (inode.isDirectory()) { - throw new IOException("Path " + path + " is a directory."); - } - return inode; - } - - @Override - public FileStatus[] listStatus(Path f) throws IOException { - Path absolutePath = makeAbsolute(f); - INode inode = store.retrieveINode(absolutePath); - if (inode == null) { - throw new FileNotFoundException("File " + f + " does not exist."); - } - if (inode.isFile()) { - return new FileStatus[] { - new S3FileStatus(f.makeQualified(this), inode) - }; - } - ArrayList<FileStatus> ret = new ArrayList<FileStatus>(); - for (Path p : store.listSubPaths(absolutePath)) { - ret.add(getFileStatus(p.makeQualified(this))); - } - return ret.toArray(new FileStatus[0]); - } - @Override - public FSDataOutputStream create(Path file, FsPermission permission, - boolean overwrite, int bufferSize, - short replication, long blockSize, Progressable progress) - throws IOException { - - INode inode = store.retrieveINode(makeAbsolute(file)); - if (inode != null) { - if (overwrite) { - delete(file, true); - } else { - throw new IOException("File already exists: " + file); - } - } else { - Path parent = file.getParent(); - if (parent != null) { - if (!mkdirs(parent)) { - throw new IOException("Mkdirs failed to create " + parent.toString()); - } - } - } - return new FSDataOutputStream - (new S3OutputStream(getConf(), store, makeAbsolute(file), - blockSize, progress, bufferSize), - statistics); - } - @Override - public boolean mkdirs(Path path, FsPermission permission) throws IOException { - Path absolutePath = makeAbsolute(path); - List<Path> paths = new ArrayList<Path>(); - do { - paths.add(0, absolutePath); - absolutePath = absolutePath.getParent(); - } while (absolutePath != null); - - boolean result = true; - for (Path p : paths) { - result &= mkdir(p); - } - return result; - } - - @Override - public Path getWorkingDirectory() { - return workingDir; - } - - @Override - public boolean rename(Path src, Path dst) throws IOException { - Path absoluteSrc = makeAbsolute(src); - INode srcINode = store.retrieveINode(absoluteSrc); - if (srcINode == null) { - // src path doesn't exist - return false; - } - Path absoluteDst = makeAbsolute(dst); - INode dstINode = store.retrieveINode(absoluteDst); - if (dstINode != null && dstINode.isDirectory()) { - absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); - dstINode = store.retrieveINode(absoluteDst); - } - if (dstINode != null) { - // dst path already exists - can't overwrite - return false; - } - Path dstParent = absoluteDst.getParent(); - if (dstParent != null) { - INode dstParentINode = store.retrieveINode(dstParent); - if (dstParentINode == null || dstParentINode.isFile()) { - // dst parent doesn't exist or is a file - return false; - } - } - return renameRecursive(absoluteSrc, absoluteDst); - } - - private boolean renameRecursive(Path src, Path dst) throws IOException { - INode srcINode = store.retrieveINode(src); - store.storeINode(dst, srcINode); - store.deleteINode(src); - if (srcINode.isDirectory()) { - for (Path oldSrc : store.listDeepSubPaths(src)) { - INode inode = store.retrieveINode(oldSrc); - if (inode == null) { - return false; - } - String oldSrcPath = oldSrc.toUri().getPath(); - String srcPath = src.toUri().getPath(); - String dstPath = dst.toUri().getPath(); - Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath)); - store.storeINode(newDst, inode); - store.deleteINode(oldSrc); - } - } - return true; - } - - @Override - public boolean delete(Path path, boolean recursive) throws IOException { - Path absolutePath = makeAbsolute(path); - INode inode = store.retrieveINode(absolutePath); - if (inode == null) { - return false; - } - if (inode.isFile()) { - store.deleteINode(absolutePath); - for (Block block: inode.getBlocks()) { - store.deleteBlock(block); - } - } else { - FileStatus[] contents = null; - try { - contents = listStatus(absolutePath); - } catch(FileNotFoundException fnfe) { - return false; - } - - if ((contents.length !=0) && (!recursive)) { - throw new IOException("Directory " + path.toString() - + " is not empty."); - } - for (FileStatus p:contents) { - if (!delete(p.getPath(), recursive)) { - return false; - } - } - store.deleteINode(absolutePath); - } - return true; - } - - /** - * FileStatus for S3 file systems. - */ - @Override - public FileStatus getFileStatus(Path f) throws IOException { - INode inode = store.retrieveINode(makeAbsolute(f)); - if (inode == null) { - throw new FileNotFoundException(f + ": No such file or directory."); - } - return new S3FileStatus(f.makeQualified(this), inode); - } - private boolean mkdir(Path path) throws IOException { - Path absolutePath = makeAbsolute(path); - INode inode = store.retrieveINode(absolutePath); - if (inode == null) { - store.storeINode(absolutePath, INode.DIRECTORY_INODE); - } else if (inode.isFile()) { - throw new IOException(String.format( - "Can't make directory for path %s since it is a file.", - absolutePath)); - } - return true; - } - private Path makeAbsolute(Path path) { - if (path.isAbsolute()) { - return path; - } - return new Path(workingDir, path); - } - - private static class S3FileStatus extends FileStatus { - - S3FileStatus(Path f, INode inode) throws IOException { - super(findLength(inode), inode.isDirectory(), 1, - findBlocksize(inode), 0, f); - } - - private static long findLength(INode inode) { - if (!inode.isDirectory()) { - long length = 0L; - for (Block block : inode.getBlocks()) { - length += block.getLength(); - } - return length; - } - return 0; - } - - private static long findBlocksize(INode inode) { - final Block[] ret = inode.getBlocks(); - return ret == null ? 0L : ret[0].getLength(); - } - } -} \ No newline at end of file
