http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 804bacd..169e5ea 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -46,7 +46,6 @@ import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.control.CcId; -import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksException; @@ -57,6 +56,7 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobParameterByteStore; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; +import org.apache.hyracks.api.result.IResultPartitionManager; import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.api.util.InvokeUtil; @@ -74,15 +74,15 @@ import org.apache.hyracks.control.common.job.profiling.om.JobProfile; import org.apache.hyracks.control.common.work.FutureValue; import org.apache.hyracks.control.common.work.WorkQueue; import org.apache.hyracks.control.nc.application.NCServiceContext; -import org.apache.hyracks.control.nc.dataset.DatasetPartitionManager; import org.apache.hyracks.control.nc.heartbeat.HeartbeatComputeTask; import org.apache.hyracks.control.nc.heartbeat.HeartbeatTask; import org.apache.hyracks.control.nc.io.IOManager; -import org.apache.hyracks.control.nc.net.DatasetNetworkManager; import org.apache.hyracks.control.nc.net.MessagingNetworkManager; import org.apache.hyracks.control.nc.net.NetworkManager; +import org.apache.hyracks.control.nc.net.ResultNetworkManager; import org.apache.hyracks.control.nc.partitions.PartitionManager; import org.apache.hyracks.control.nc.resources.memory.MemoryManager; +import org.apache.hyracks.control.nc.result.ResultPartitionManager; import org.apache.hyracks.control.nc.work.AbortAllJobsWork; import org.apache.hyracks.control.nc.work.BuildJobProfilesWork; import org.apache.hyracks.ipc.api.IIPCEventListener; @@ -118,9 +118,9 @@ public class NodeControllerService implements IControllerService { private NetworkManager netManager; - private IDatasetPartitionManager datasetPartitionManager; + private IResultPartitionManager resultPartitionManager; - private DatasetNetworkManager datasetNetworkManager; + private ResultNetworkManager resultNetworkManager; private final WorkQueue workQueue; @@ -262,10 +262,10 @@ public class NodeControllerService implements IControllerService { } private void init() { - datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.getResultManagerMemory(), + resultPartitionManager = new ResultPartitionManager(this, executor, ncConfig.getResultManagerMemory(), ncConfig.getResultTTL(), ncConfig.getResultSweepThreshold()); - datasetNetworkManager = new DatasetNetworkManager(ncConfig.getResultListenAddress(), - ncConfig.getResultListenPort(), datasetPartitionManager, ncConfig.getNetThreadCount(), + resultNetworkManager = new ResultNetworkManager(ncConfig.getResultListenAddress(), + ncConfig.getResultListenPort(), resultPartitionManager, ncConfig.getNetThreadCount(), ncConfig.getNetBufferCount(), ncConfig.getResultPublicAddress(), ncConfig.getResultPublicPort(), FullFrameChannelInterfaceFactory.INSTANCE); if (ncConfig.getMessagingListenAddress() != null && serviceCtx.getMessagingChannelInterfaceFactory() != null) { @@ -289,7 +289,7 @@ public class NodeControllerService implements IControllerService { netManager.start(); startApplication(); init(); - datasetNetworkManager.start(); + resultNetworkManager.start(); if (messagingNetManager != null) { messagingNetManager.start(); } @@ -323,11 +323,11 @@ public class NodeControllerService implements IControllerService { } HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos); - NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress(); + NetworkAddress resultAddress = resultNetworkManager.getPublicNetworkAddress(); NetworkAddress netAddress = netManager.getPublicNetworkAddress(); NetworkAddress messagingAddress = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null; - nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress, hbSchema, + nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, resultAddress, hbSchema, messagingAddress, application.getCapacity()); ncData = new NodeControllerData(nodeRegistration); @@ -494,9 +494,9 @@ public class NodeControllerService implements IControllerService { LOGGER.log(Level.ERROR, "Some jobs failed to exit, continuing with abnormal shutdown"); } partitionManager.close(); - datasetPartitionManager.close(); + resultPartitionManager.close(); netManager.stop(); - datasetNetworkManager.stop(); + resultNetworkManager.stop(); if (messagingNetManager != null) { messagingNetManager.stop(); } @@ -582,8 +582,8 @@ public class NodeControllerService implements IControllerService { return netManager; } - public DatasetNetworkManager getDatasetNetworkManager() { - return datasetNetworkManager; + public ResultNetworkManager getResultNetworkManager() { + return resultNetworkManager; } public PartitionManager getPartitionManager() { @@ -645,8 +645,8 @@ public class NodeControllerService implements IControllerService { getClusterController(ccId).sendApplicationMessageToCC(data, deploymentId, id); } - public IDatasetPartitionManager getDatasetPartitionManager() { - return datasetPartitionManager; + public IResultPartitionManager getResultPartitionManager() { + return resultPartitionManager; } public MessagingNetworkManager getMessagingNetworkManager() {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 340924d..f6531d7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -43,7 +43,6 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataflow.state.IStateObject; -import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; @@ -57,6 +56,7 @@ import org.apache.hyracks.api.job.profiling.counters.ICounter; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.partitions.PartitionId; import org.apache.hyracks.api.resources.IDeallocatable; +import org.apache.hyracks.api.result.IResultPartitionManager; import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.common.job.PartitionState; @@ -427,8 +427,8 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { } @Override - public IDatasetPartitionManager getDatasetPartitionManager() { - return ncs.getDatasetPartitionManager(); + public IResultPartitionManager getResultPartitionManager() { + return ncs.getResultPartitionManager(); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetMemoryManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetMemoryManager.java deleted file mode 100644 index 37cddb8..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetMemoryManager.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.nc.dataset; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.partitions.ResultSetPartitionId; - -public class DatasetMemoryManager { - private int availableMemory; - - private final Set<Page> availPages; - - private final LeastRecentlyUsedList leastRecentlyUsedList; - - private final Map<ResultSetPartitionId, PartitionNode> resultPartitionNodesMap; - - private final static int FRAME_SIZE = 32768; - - public DatasetMemoryManager(int availableMemory) { - this.availableMemory = availableMemory; - - availPages = new HashSet<Page>(); - - // Atleast have one page for temporarily storing the results. - if (this.availableMemory <= FRAME_SIZE) - this.availableMemory = FRAME_SIZE; - - leastRecentlyUsedList = new LeastRecentlyUsedList(); - resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>(); - } - - public synchronized Page requestPage(ResultSetPartitionId resultSetPartitionId, ResultState resultState) - throws HyracksDataException { - Page page; - if (availPages.isEmpty()) { - if (availableMemory >= FRAME_SIZE) { - /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's allocateFrame() - * instead of direct ByteBuffer.allocate()? - */ - availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE))); - availableMemory -= FRAME_SIZE; - page = getAvailablePage(); - } else { - page = evictPage(); - } - } else { - page = getAvailablePage(); - } - - page.clear(); - - /* - * It is extremely important to update the reference after obtaining the page because, in the cases where memory - * manager is allocated only one page of memory, the front of the LRU list should not be created by the - * update reference call before a page is pushed on to the element of the LRU list. So we first obtain the page, - * then make a updateReference call which in turn creates a new node in the LRU list and then add the page to it. - */ - PartitionNode pn = updateReference(resultSetPartitionId, resultState); - pn.add(page); - return page; - } - - public void pageReferenced(ResultSetPartitionId resultSetPartitionId) { - // When a page is referenced the dataset partition writer should already be known, so we pass null. - updateReference(resultSetPartitionId, null); - } - - public static int getPageSize() { - return FRAME_SIZE; - } - - protected void insertPartitionNode(ResultSetPartitionId resultSetPartitionId, PartitionNode pn) { - leastRecentlyUsedList.add(pn); - resultPartitionNodesMap.put(resultSetPartitionId, pn); - } - - protected PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId, ResultState resultState) { - PartitionNode pn = null; - - if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) { - if (resultState != null) { - pn = new PartitionNode(resultSetPartitionId, resultState); - insertPartitionNode(resultSetPartitionId, pn); - } - return pn; - } - synchronized (this) { - pn = resultPartitionNodesMap.get(resultSetPartitionId); - leastRecentlyUsedList.remove(pn); - insertPartitionNode(resultSetPartitionId, pn); - } - - return pn; - } - - protected Page evictPage() throws HyracksDataException { - PartitionNode pn = leastRecentlyUsedList.getFirst(); - ResultState resultState = pn.getResultState(); - Page page = resultState.returnPage(); - - /* If the partition holding the pages breaks the contract by not returning the page or it has no page, just take - * away all the pages allocated to it and add to the available pages set. - */ - if (page == null) { - availPages.addAll(pn); - pn.clear(); - resultPartitionNodesMap.remove(pn.getResultSetPartitionId()); - leastRecentlyUsedList.remove(pn); - - /* Based on the assumption that if the dataset partition writer returned a null page, it should be lying about - * the number of pages it holds in which case we just evict all the pages it holds and should thus be able to - * add all those pages to available set and we have at least one page to allocate back. - */ - page = getAvailablePage(); - } else { - pn.remove(page); - - // If the partition no more holds any pages, remove it from the linked list and the hash map. - if (pn.isEmpty()) { - resultPartitionNodesMap.remove(pn.getResultSetPartitionId()); - leastRecentlyUsedList.remove(pn); - } - } - - return page; - } - - protected Page getAvailablePage() { - Iterator<Page> iter = availPages.iterator(); - Page page = iter.next(); - iter.remove(); - return page; - } - - private class LeastRecentlyUsedList { - private PartitionNode head; - - private PartitionNode tail; - - public LeastRecentlyUsedList() { - head = null; - tail = null; - } - - public void add(PartitionNode node) { - if (head == null) { - head = tail = node; - return; - } - tail.setNext(node); - node.setPrev(tail); - tail = node; - } - - public void remove(PartitionNode node) { - if ((node == head) && (node == tail)) { - head = tail = null; - return; - } else if (node == head) { - head = head.getNext(); - head.setPrev(null); - return; - } else if (node == tail) { - tail = tail.getPrev(); - tail.setNext(null); - return; - } else { - PartitionNode prev = node.getPrev(); - PartitionNode next = node.getNext(); - prev.setNext(next); - next.setPrev(prev); - } - } - - public PartitionNode getFirst() { - return head; - } - } - - private class PartitionNode extends HashSet<Page> { - private static final long serialVersionUID = 1L; - - private final ResultSetPartitionId resultSetPartitionId; - - private final ResultState resultState; - - private PartitionNode prev; - - private PartitionNode next; - - public PartitionNode(ResultSetPartitionId resultSetPartitionId, ResultState resultState) { - this.resultSetPartitionId = resultSetPartitionId; - this.resultState = resultState; - prev = null; - next = null; - } - - public ResultSetPartitionId getResultSetPartitionId() { - return resultSetPartitionId; - } - - public ResultState getResultState() { - return resultState; - } - - public void setPrev(PartitionNode node) { - prev = node; - } - - public PartitionNode getPrev() { - return prev; - } - - public void setNext(PartitionNode node) { - next = node; - } - - public PartitionNode getNext() { - return next; - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java deleted file mode 100644 index b7cf9a4..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.nc.dataset; - -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executor; - -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataset.IDatasetPartitionManager; -import org.apache.hyracks.api.dataset.ResultSetId; -import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.io.IWorkspaceFileFactory; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.control.common.dataset.AbstractDatasetManager; -import org.apache.hyracks.control.common.dataset.ResultStateSweeper; -import org.apache.hyracks.control.nc.NodeControllerService; -import org.apache.hyracks.control.nc.io.WorkspaceFileFactory; -import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class DatasetPartitionManager extends AbstractDatasetManager implements IDatasetPartitionManager { - private static final Logger LOGGER = LogManager.getLogger(); - - private final NodeControllerService ncs; - - private final Executor executor; - - private final Map<JobId, ResultSetMap> partitionResultStateMap; - - private final DefaultDeallocatableRegistry deallocatableRegistry; - - private final IWorkspaceFileFactory fileFactory; - - private final DatasetMemoryManager datasetMemoryManager; - - public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL, - long resultSweepThreshold) { - super(resultTTL); - this.ncs = ncs; - this.executor = executor; - deallocatableRegistry = new DefaultDeallocatableRegistry(); - fileFactory = new WorkspaceFileFactory(deallocatableRegistry, ncs.getIoManager()); - if (availableMemory >= DatasetMemoryManager.getPageSize()) { - datasetMemoryManager = new DatasetMemoryManager(availableMemory); - } else { - datasetMemoryManager = null; - } - partitionResultStateMap = new HashMap<>(); - executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER)); - } - - @Override - public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult, - boolean asyncMode, int partition, int nPartitions, long maxReads) { - DatasetPartitionWriter dpw; - JobId jobId = ctx.getJobletContext().getJobId(); - synchronized (this) { - dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions, - datasetMemoryManager, fileFactory, maxReads); - ResultSetMap rsIdMap = partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap()); - ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, nPartitions); - resultStates[partition] = dpw.getResultState(); - } - LOGGER.debug("Initialized partition writer: JobId: {}:partition: {}", jobId, partition); - return dpw; - } - - @Override - public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions, - boolean orderedResult, boolean emptyResult) throws HyracksException { - try { - // Be sure to send the *public* network address to the CC - ncs.getClusterController(jobId.getCcId()).registerResultPartitionLocation(jobId, rsId, orderedResult, - emptyResult, partition, nPartitions, ncs.getDatasetNetworkManager().getPublicNetworkAddress()); - } catch (Exception e) { - throw HyracksException.create(e); - } - } - - @Override - public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException { - try { - LOGGER.debug("Reporting partition write completion: JobId: {}:ResultSetId: {}:partition: {}", jobId, rsId, - partition); - ncs.getClusterController(jobId.getCcId()).reportResultPartitionWriteCompletion(jobId, rsId, partition); - } catch (Exception e) { - throw HyracksException.create(e); - } - } - - @Override - public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, - IFrameWriter writer) throws HyracksException { - ResultState resultState = getResultState(jobId, resultSetId, partition); - DatasetPartitionReader dpr = new DatasetPartitionReader(this, datasetMemoryManager, executor, resultState); - dpr.writeTo(writer); - LOGGER.debug("Initialized partition reader: JobId: {}:ResultSetId: {}:partition: {}", jobId, resultSetId, - partition); - } - - private synchronized ResultState getResultState(JobId jobId, ResultSetId resultSetId, int partition) - throws HyracksException { - ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); - if (rsIdMap == null) { - throw new HyracksException("Unknown JobId " + jobId); - } - ResultState[] resultStates = rsIdMap.getResultStates(resultSetId); - if (resultStates == null) { - throw new HyracksException("Unknown JobId: " + jobId + " ResultSetId: " + resultSetId); - } - ResultState resultState = resultStates[partition]; - if (resultState == null) { - throw new HyracksException("No DatasetPartitionWriter for partition " + partition); - } - return resultState; - } - - @Override - public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) { - ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); - if (rsIdMap != null && rsIdMap.removePartition(jobId, resultSetId, partition)) { - partitionResultStateMap.remove(jobId); - } - } - - @Override - public synchronized void abortReader(JobId jobId) { - ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); - if (rsIdMap != null) { - rsIdMap.abortAll(); - } - } - - @Override - public synchronized void close() { - for (JobId jobId : getJobIds()) { - deinit(jobId); - } - deallocatableRegistry.close(); - } - - @Override - public synchronized Set<JobId> getJobIds() { - return partitionResultStateMap.keySet(); - } - - @Override - public synchronized ResultSetMap getState(JobId jobId) { - return partitionResultStateMap.get(jobId); - } - - @Override - public synchronized void sweep(JobId jobId) { - deinit(jobId); - partitionResultStateMap.remove(jobId); - } - - private synchronized void deinit(JobId jobId) { - ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); - if (rsIdMap != null) { - rsIdMap.closeAndDeleteAll(); - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java deleted file mode 100644 index 8c4fcb0..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.nc.dataset; - -import java.nio.ByteBuffer; -import java.util.concurrent.Executor; - -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.partitions.ResultSetPartitionId; -import org.apache.hyracks.comm.channels.NetworkOutputChannel; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class DatasetPartitionReader { - private static final Logger LOGGER = LogManager.getLogger(); - - private final DatasetPartitionManager datasetPartitionManager; - private final DatasetMemoryManager datasetMemoryManager; - private final Executor executor; - private final ResultState resultState; - - public DatasetPartitionReader(DatasetPartitionManager datasetPartitionManager, - DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) { - this.datasetPartitionManager = datasetPartitionManager; - this.datasetMemoryManager = datasetMemoryManager; - this.executor = executor; - this.resultState = resultState; - } - - public void writeTo(final IFrameWriter writer) { - executor.execute(new ResultPartitionSender((NetworkOutputChannel) writer)); - } - - private class ResultPartitionSender implements Runnable { - - private final NetworkOutputChannel channel; - - ResultPartitionSender(final NetworkOutputChannel channel) { - this.channel = channel; - } - - @Override - public void run() { - channel.setFrameSize(resultState.getFrameSize()); - channel.open(); - try { - resultState.readOpen(); - long offset = 0; - final ByteBuffer buffer = ByteBuffer.allocate(resultState.getFrameSize()); - while (true) { - buffer.clear(); - final long size = read(offset, buffer); - if (size <= 0) { - break; - } else if (size < buffer.limit()) { - throw new IllegalStateException( - "Premature end of file - readSize: " + size + " buffer limit: " + buffer.limit()); - } - offset += size; - buffer.flip(); - channel.nextFrame(buffer); - } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("result reading successful(" + resultState.getResultSetPartitionId() + ")"); - } - } catch (Exception e) { - LOGGER.error(() -> "failed to send result partition " + resultState.getResultSetPartitionId(), e); - channel.abort(); - } finally { - close(); - } - } - - private long read(long offset, ByteBuffer buffer) throws HyracksDataException { - return datasetMemoryManager != null ? resultState.read(datasetMemoryManager, offset, buffer) - : resultState.read(offset, buffer); - } - - private void close() { - try { - channel.close(); - resultState.readClose(); - if (resultState.isExhausted()) { - final ResultSetPartitionId partitionId = resultState.getResultSetPartitionId(); - datasetPartitionManager.removePartition(partitionId.getJobId(), partitionId.getResultSetId(), - partitionId.getPartition()); - } - } catch (HyracksDataException e) { - LOGGER.error("unexpected failure in partition reader clean up", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java deleted file mode 100644 index b593bb5..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.nc.dataset; - -import java.nio.ByteBuffer; - -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataset.IDatasetPartitionManager; -import org.apache.hyracks.api.dataset.ResultSetId; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.io.IWorkspaceFileFactory; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.partitions.ResultSetPartitionId; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class DatasetPartitionWriter implements IFrameWriter { - private static final Logger LOGGER = LogManager.getLogger(); - - private final IDatasetPartitionManager manager; - - private final JobId jobId; - - private final ResultSetId resultSetId; - - private final boolean orderedResult; - - private final int partition; - - private final int nPartitions; - - private final DatasetMemoryManager datasetMemoryManager; - - private final ResultSetPartitionId resultSetPartitionId; - - private final ResultState resultState; - - private boolean partitionRegistered; - - private boolean failed = false; - - public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId, - ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions, - DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory, long maxReads) { - this.manager = manager; - this.jobId = jobId; - this.resultSetId = rsId; - this.orderedResult = orderedResult; - this.partition = partition; - this.nPartitions = nPartitions; - this.datasetMemoryManager = datasetMemoryManager; - - resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition); - resultState = new ResultState(resultSetPartitionId, asyncMode, ctx.getIoManager(), fileFactory, - ctx.getInitialFrameSize(), maxReads); - } - - public ResultState getResultState() { - return resultState; - } - - @Override - public void open() { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("open(" + partition + ")"); - } - partitionRegistered = false; - resultState.open(); - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - registerResultPartitionLocation(false); - if (datasetMemoryManager == null) { - resultState.write(buffer); - } else { - resultState.write(datasetMemoryManager, buffer); - } - } - - @Override - public void fail() throws HyracksDataException { - failed = true; - resultState.closeAndDelete(); - resultState.abort(); - } - - @Override - public void close() throws HyracksDataException { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("close(" + partition + ")"); - } - try { - if (!failed) { - registerResultPartitionLocation(true); - } - } finally { - resultState.close(); - } - try { - if (partitionRegistered) { - manager.reportPartitionWriteCompletion(jobId, resultSetId, partition); - } - } catch (HyracksException e) { - throw HyracksDataException.create(e); - } - } - - void registerResultPartitionLocation(boolean empty) throws HyracksDataException { - try { - if (!partitionRegistered) { - manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, orderedResult, - empty); - partitionRegistered = true; - } - } catch (HyracksException e) { - if (e instanceof HyracksDataException) { - throw (HyracksDataException) e; - } else { - throw HyracksDataException.create(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/Page.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/Page.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/Page.java deleted file mode 100644 index 2eb33fd..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/Page.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.nc.dataset; - -import java.nio.ByteBuffer; - -public class Page { - private final ByteBuffer buffer; - - public Page(ByteBuffer buffer) { - this.buffer = buffer; - } - - public ByteBuffer getBuffer() { - return buffer; - } - - public ByteBuffer clear() { - return (ByteBuffer) buffer.clear(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java deleted file mode 100644 index 1a64a5a..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.nc.dataset; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hyracks.api.dataset.IDatasetStateRecord; -import org.apache.hyracks.api.dataset.ResultSetId; -import org.apache.hyracks.api.job.JobId; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -class ResultSetMap implements IDatasetStateRecord, Serializable { - private static final long serialVersionUID = 1L; - - private static final Logger LOGGER = LogManager.getLogger(); - - private final long timestamp; - private final HashMap<ResultSetId, ResultState[]> resultStateMap; - - ResultSetMap() { - timestamp = System.nanoTime(); - resultStateMap = new HashMap<>(); - } - - @Override - public long getTimestamp() { - return timestamp; - } - - ResultState[] getResultStates(ResultSetId rsId) { - return resultStateMap.get(rsId); - } - - ResultState[] createOrGetResultStates(ResultSetId rsId, int nPartitions) { - return resultStateMap.computeIfAbsent(rsId, (k) -> new ResultState[nPartitions]); - } - - /** - * removes a result partition for a result set - * - * @param jobId - * the id of the job that produced the result set - * @param resultSetId - * the id of the result set - * @param partition - * the partition number - * @return true, if all partitions for the resultSetId have been removed - */ - boolean removePartition(JobId jobId, ResultSetId resultSetId, int partition) { - final ResultState[] resultStates = resultStateMap.get(resultSetId); - if (resultStates != null) { - final ResultState state = resultStates[partition]; - if (state != null) { - state.closeAndDelete(); - LOGGER.debug("Removing partition: " + partition + " for JobId: " + jobId); - } - resultStates[partition] = null; - boolean stateEmpty = true; - for (ResultState resState : resultStates) { - if (resState != null) { - stateEmpty = false; - break; - } - } - if (stateEmpty) { - resultStateMap.remove(resultSetId); - } - return resultStateMap.isEmpty(); - } - return true; - } - - void abortAll() { - applyToAllStates((rsId, state, i) -> state.abort()); - } - - void closeAndDeleteAll() { - applyToAllStates((rsId, state, i) -> { - state.closeAndDelete(); - LOGGER.debug("Removing partition: " + i + " for result set " + rsId); - }); - } - - @FunctionalInterface - private interface StateModifier { - void modify(ResultSetId rsId, ResultState entry, int partition); - } - - private void applyToAllStates(StateModifier modifier) { - for (Map.Entry<ResultSetId, ResultState[]> entry : resultStateMap.entrySet()) { - final ResultSetId rsId = entry.getKey(); - final ResultState[] resultStates = entry.getValue(); - if (resultStates == null) { - continue; - } - for (int i = 0; i < resultStates.length; i++) { - final ResultState state = resultStates[i]; - if (state != null) { - modifier.modify(rsId, state, i); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java deleted file mode 100644 index 6b35912..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.nc.dataset; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.hyracks.api.dataflow.state.IStateObject; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.io.IFileHandle; -import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.api.io.IWorkspaceFileFactory; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.partitions.ResultSetPartitionId; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - -public class ResultState implements IStateObject { - private static final String FILE_PREFIX = "result_"; - - private final ResultSetPartitionId resultSetPartitionId; - - private final boolean asyncMode; - - private final int frameSize; - - private final IIOManager ioManager; - - private final IWorkspaceFileFactory fileFactory; - - private final AtomicBoolean eos; - - private final AtomicBoolean failed; - - private final List<Page> localPageList; - - private FileReference fileRef; - - private IFileHandle fileHandle; - - private volatile int referenceCount = 0; - - private long size; - - private long persistentSize; - private long remainingReads; - - ResultState(ResultSetPartitionId resultSetPartitionId, boolean asyncMode, IIOManager ioManager, - IWorkspaceFileFactory fileFactory, int frameSize, long maxReads) { - if (maxReads <= 0) { - throw new IllegalArgumentException("maxReads must be > 0"); - } - this.resultSetPartitionId = resultSetPartitionId; - this.asyncMode = asyncMode; - this.ioManager = ioManager; - this.fileFactory = fileFactory; - this.frameSize = frameSize; - remainingReads = maxReads; - eos = new AtomicBoolean(false); - failed = new AtomicBoolean(false); - localPageList = new ArrayList<>(); - - fileRef = null; - fileHandle = null; - } - - public synchronized void open() { - size = 0; - persistentSize = 0; - referenceCount = 0; - } - - public synchronized void close() { - eos.set(true); - closeWriteFileHandle(); - notifyAll(); - } - - public synchronized void closeAndDelete() { - // Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs - // to be taken when there are more requests to these result states. - failed.set(true); - closeWriteFileHandle(); - if (fileRef != null) { - fileRef.delete(); - fileRef = null; - } - } - - private void closeWriteFileHandle() { - if (fileHandle != null) { - doCloseFileHandle(); - } - } - - private void doCloseFileHandle() { - if (--referenceCount == 0) { - // close the file if there is no more reference - try { - ioManager.close(fileHandle); - } catch (IOException e) { - // Since file handle could not be closed, just ignore. - } - fileHandle = null; - } - } - - public synchronized void write(ByteBuffer buffer) throws HyracksDataException { - if (fileRef == null) { - initWriteFileHandle(); - } - - size += ioManager.syncWrite(fileHandle, size, buffer); - notifyAll(); - } - - public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer) - throws HyracksDataException { - int srcOffset = 0; - Page destPage = null; - - if (!localPageList.isEmpty()) { - destPage = localPageList.get(localPageList.size() - 1); - } - - while (srcOffset < buffer.limit()) { - if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) { - destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this); - localPageList.add(destPage); - } - int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining()); - destPage.getBuffer().put(buffer.array(), srcOffset, srcLength); - srcOffset += srcLength; - size += srcLength; - } - - notifyAll(); - } - - public synchronized void readOpen() { - if (isExhausted()) { - throw new IllegalStateException("Result reads exhausted"); - } - remainingReads--; - } - - public synchronized void readClose() throws HyracksDataException { - if (fileHandle != null) { - doCloseFileHandle(); - } - } - - public synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException { - long readSize = 0; - - while (offset >= size && !eos.get() && !failed.get()) { - try { - wait(); - } catch (InterruptedException e) { - throw HyracksDataException.create(e); - } - } - if ((offset >= size && eos.get()) || failed.get()) { - return readSize; - } - - if (fileHandle == null) { - initReadFileHandle(); - } - readSize = ioManager.syncRead(fileHandle, offset, buffer); - - return readSize; - } - - public synchronized long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer) - throws HyracksDataException { - long readSize = 0; - while (offset >= size && !eos.get() && !failed.get()) { - try { - wait(); - } catch (InterruptedException e) { - throw HyracksDataException.create(e); - } - } - - if ((offset >= size && eos.get()) || failed.get()) { - return readSize; - } - - if (offset < persistentSize) { - if (fileHandle == null) { - initReadFileHandle(); - } - readSize = ioManager.syncRead(fileHandle, offset, buffer); - if (readSize < 0) { - throw new HyracksDataException("Premature end of file"); - } - } - - if (readSize < buffer.capacity()) { - long localPageOffset = offset - persistentSize; - int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize()); - int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize()); - Page page = getPage(localPageIndex); - if (page == null) { - return readSize; - } - readSize += buffer.remaining(); - buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining()); - } - datasetMemoryManager.pageReferenced(resultSetPartitionId); - return readSize; - } - - public synchronized void abort() { - failed.set(true); - notifyAll(); - } - - public synchronized Page returnPage() throws HyracksDataException { - Page page = removePage(); - - // If we do not have any pages to be given back close the write channel since we don't write any more, return null. - if (page == null) { - ioManager.close(fileHandle); - return null; - } - - page.getBuffer().flip(); - - if (fileRef == null) { - initWriteFileHandle(); - } - - long delta = ioManager.syncWrite(fileHandle, persistentSize, page.getBuffer()); - persistentSize += delta; - return page; - } - - public synchronized void setEOS(boolean eos) { - this.eos.set(eos); - } - - public ResultSetPartitionId getResultSetPartitionId() { - return resultSetPartitionId; - } - - public int getFrameSize() { - return frameSize; - } - - public IIOManager getIOManager() { - return ioManager; - } - - public boolean getAsyncMode() { - return asyncMode; - } - - @Override - public JobId getJobId() { - return resultSetPartitionId.getJobId(); - } - - @Override - public Object getId() { - return resultSetPartitionId; - } - - @Override - public long getMemoryOccupancy() { - throw new UnsupportedOperationException(); - } - - @Override - public void toBytes(DataOutput out) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void fromBytes(DataInput in) throws IOException { - throw new UnsupportedOperationException(); - } - - private Page getPage(int index) { - Page page = null; - if (!localPageList.isEmpty()) { - page = localPageList.get(index); - } - return page; - } - - private Page removePage() { - Page page = null; - if (!localPageList.isEmpty()) { - page = localPageList.remove(localPageList.size() - 1); - } - return page; - } - - private void initWriteFileHandle() throws HyracksDataException { - if (fileHandle == null) { - String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition()); - fileRef = fileFactory.createUnmanagedWorkspaceFile(fName); - fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); - if (referenceCount != 0) { - throw new IllegalStateException("Illegal reference count " + referenceCount); - } - referenceCount = 1; - notifyAll(); // NOSONAR: always called from a synchronized block - } - } - - private void initReadFileHandle() throws HyracksDataException { - while (fileRef == null && !failed.get()) { - // wait for writer to create the file - try { - wait(); - } catch (InterruptedException e) { - throw HyracksDataException.create(e); - } - } - if (failed.get()) { - return; - } - if (fileHandle == null) { - // fileHandle has been closed by the writer, create it again - fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); - } - referenceCount++; - } - - @Override - public String toString() { - try { - ObjectMapper om = new ObjectMapper(); - ObjectNode on = om.createObjectNode(); - on.put("rspid", resultSetPartitionId.toString()); - on.put("async", asyncMode); - on.put("remainingReads", remainingReads); - on.put("eos", eos.get()); - on.put("failed", failed.get()); - on.put("fileRef", String.valueOf(fileRef)); - return om.writer(new MinimalPrettyPrinter()).writeValueAsString(on); - } catch (JsonProcessingException e) { // NOSONAR - return e.getMessage(); - } - } - - public synchronized boolean isExhausted() { - return remainingReads == 0; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java index dfa5ff3..9bfd061 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java @@ -85,11 +85,11 @@ public class HeartbeatComputeTask extends TimerTask { hbData.netSignalingBytesRead = netPC.getSignalingBytesRead(); hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten(); - MuxDemuxPerformanceCounters datasetNetPC = ncs.getDatasetNetworkManager().getPerformanceCounters(); - hbData.datasetNetPayloadBytesRead = datasetNetPC.getPayloadBytesRead(); - hbData.datasetNetPayloadBytesWritten = datasetNetPC.getPayloadBytesWritten(); - hbData.datasetNetSignalingBytesRead = datasetNetPC.getSignalingBytesRead(); - hbData.datasetNetSignalingBytesWritten = datasetNetPC.getSignalingBytesWritten(); + MuxDemuxPerformanceCounters resultNetPC = ncs.getResultNetworkManager().getPerformanceCounters(); + hbData.resultNetPayloadBytesRead = resultNetPC.getPayloadBytesRead(); + hbData.resultNetPayloadBytesWritten = resultNetPC.getPayloadBytesWritten(); + hbData.resultNetSignalingBytesRead = resultNetPC.getSignalingBytesRead(); + hbData.resultNetSignalingBytesWritten = resultNetPC.getSignalingBytesWritten(); IPCPerformanceCounters ipcPC = ncs.getIpcSystem().getPerformanceCounters(); hbData.ipcMessagesSent = ipcPC.getMessageSentCount(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java deleted file mode 100644 index 5eba281..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.nc.net; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; - -import org.apache.hyracks.api.comm.IChannelInterfaceFactory; -import org.apache.hyracks.api.comm.ICloseableBufferAcceptor; -import org.apache.hyracks.api.comm.NetworkAddress; -import org.apache.hyracks.api.dataset.IDatasetPartitionManager; -import org.apache.hyracks.api.dataset.ResultSetId; -import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.exceptions.NetException; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.comm.channels.IChannelConnectionFactory; -import org.apache.hyracks.comm.channels.NetworkOutputChannel; -import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock; -import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener; -import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection; -import org.apache.hyracks.net.protocols.muxdemux.MuxDemux; -import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class DatasetNetworkManager implements IChannelConnectionFactory { - private static final Logger LOGGER = LogManager.getLogger(); - - private static final int MAX_CONNECTION_ATTEMPTS = 5; - - static final int INITIAL_MESSAGE_SIZE = 20; - - private final IDatasetPartitionManager partitionManager; - - private final MuxDemux md; - - private final int nBuffers; - - private NetworkAddress localNetworkAddress; - - private NetworkAddress publicNetworkAddress; - - /** - * @param inetAddress - * - Internet address to bind the listen port to - * @param inetPort - * - Port to bind on inetAddress - * @param publicInetAddress - * - Internet address to report to consumers; - * useful when behind NAT. null = same as inetAddress - * @param publicInetPort - * - Port to report to consumers; useful when - * behind NAT. Ignored if publicInetAddress is null. 0 = same as inetPort - */ - public DatasetNetworkManager(String inetAddress, int inetPort, IDatasetPartitionManager partitionManager, - int nThreads, int nBuffers, String publicInetAddress, int publicInetPort, - IChannelInterfaceFactory channelInterfaceFactory) { - this.partitionManager = partitionManager; - this.nBuffers = nBuffers; - md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads, - MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory); - // Just save these values for the moment; may be reset in start() - publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort); - } - - public void start() throws IOException { - md.start(); - InetSocketAddress sockAddr = md.getLocalAddress(); - localNetworkAddress = new NetworkAddress(sockAddr.getHostString(), sockAddr.getPort()); - - // See if the public address was explicitly specified, and if not, - // make it a copy of localNetworkAddress - if (publicNetworkAddress.getAddress() == null) { - publicNetworkAddress = localNetworkAddress; - } else { - // Likewise for public port - if (publicNetworkAddress.getPort() == 0) { - publicNetworkAddress = new NetworkAddress(publicNetworkAddress.getAddress(), sockAddr.getPort()); - } - } - } - - public NetworkAddress getLocalNetworkAddress() { - return localNetworkAddress; - } - - public NetworkAddress getPublicNetworkAddress() { - return publicNetworkAddress; - } - - public void stop() { - - } - - public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException { - MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress); - return mConn.openChannel(); - } - - private class ChannelOpenListener implements IChannelOpenListener { - @Override - public void channelOpened(ChannelControlBlock channel) { - channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel)); - channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE)); - } - } - - private class InitialBufferAcceptor implements ICloseableBufferAcceptor { - private final ChannelControlBlock ccb; - - private NetworkOutputChannel noc; - - public InitialBufferAcceptor(ChannelControlBlock ccb) { - this.ccb = ccb; - } - - @Override - public void accept(ByteBuffer buffer) { - JobId jobId = new JobId(buffer.getLong()); - ResultSetId rsId = new ResultSetId(buffer.getLong()); - int partition = buffer.getInt(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Received initial dataset partition read request for JobId: " + jobId + " partition: " - + partition + " on channel: " + ccb); - } - noc = new NetworkOutputChannel(ccb, nBuffers); - try { - partitionManager.initializeDatasetPartitionReader(jobId, rsId, partition, noc); - } catch (HyracksException e) { - noc.abort(); - } - } - - @Override - public void close() { - - } - - @Override - public void error(int ecode) { - if (noc != null) { - noc.abort(); - } - } - } - - public MuxDemuxPerformanceCounters getPerformanceCounters() { - return md.getPerformanceCounters(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java new file mode 100644 index 0000000..e56bfe6 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.net; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.comm.IChannelInterfaceFactory; +import org.apache.hyracks.api.comm.ICloseableBufferAcceptor; +import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.exceptions.NetException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.result.IResultPartitionManager; +import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.comm.channels.IChannelConnectionFactory; +import org.apache.hyracks.comm.channels.NetworkOutputChannel; +import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock; +import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener; +import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection; +import org.apache.hyracks.net.protocols.muxdemux.MuxDemux; +import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ResultNetworkManager implements IChannelConnectionFactory { + private static final Logger LOGGER = LogManager.getLogger(); + + private static final int MAX_CONNECTION_ATTEMPTS = 5; + + static final int INITIAL_MESSAGE_SIZE = 20; + + private final IResultPartitionManager partitionManager; + + private final MuxDemux md; + + private final int nBuffers; + + private NetworkAddress localNetworkAddress; + + private NetworkAddress publicNetworkAddress; + + /** + * @param inetAddress + * - Internet address to bind the listen port to + * @param inetPort + * - Port to bind on inetAddress + * @param publicInetAddress + * - Internet address to report to consumers; + * useful when behind NAT. null = same as inetAddress + * @param publicInetPort + * - Port to report to consumers; useful when + * behind NAT. Ignored if publicInetAddress is null. 0 = same as inetPort + */ + public ResultNetworkManager(String inetAddress, int inetPort, IResultPartitionManager partitionManager, + int nThreads, int nBuffers, String publicInetAddress, int publicInetPort, + IChannelInterfaceFactory channelInterfaceFactory) { + this.partitionManager = partitionManager; + this.nBuffers = nBuffers; + md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads, + MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory); + // Just save these values for the moment; may be reset in start() + publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort); + } + + public void start() throws IOException { + md.start(); + InetSocketAddress sockAddr = md.getLocalAddress(); + localNetworkAddress = new NetworkAddress(sockAddr.getHostString(), sockAddr.getPort()); + + // See if the public address was explicitly specified, and if not, + // make it a copy of localNetworkAddress + if (publicNetworkAddress.getAddress() == null) { + publicNetworkAddress = localNetworkAddress; + } else { + // Likewise for public port + if (publicNetworkAddress.getPort() == 0) { + publicNetworkAddress = new NetworkAddress(publicNetworkAddress.getAddress(), sockAddr.getPort()); + } + } + } + + public NetworkAddress getLocalNetworkAddress() { + return localNetworkAddress; + } + + public NetworkAddress getPublicNetworkAddress() { + return publicNetworkAddress; + } + + public void stop() { + + } + + public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException { + MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress); + return mConn.openChannel(); + } + + private class ChannelOpenListener implements IChannelOpenListener { + @Override + public void channelOpened(ChannelControlBlock channel) { + channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel)); + channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE)); + } + } + + private class InitialBufferAcceptor implements ICloseableBufferAcceptor { + private final ChannelControlBlock ccb; + + private NetworkOutputChannel noc; + + public InitialBufferAcceptor(ChannelControlBlock ccb) { + this.ccb = ccb; + } + + @Override + public void accept(ByteBuffer buffer) { + JobId jobId = new JobId(buffer.getLong()); + ResultSetId rsId = new ResultSetId(buffer.getLong()); + int partition = buffer.getInt(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Received initial result partition read request for JobId: " + jobId + " partition: " + + partition + " on channel: " + ccb); + } + noc = new NetworkOutputChannel(ccb, nBuffers); + try { + partitionManager.initializeResultPartitionReader(jobId, rsId, partition, noc); + } catch (HyracksException e) { + LOGGER.warn("Failed to initialize result partition reader", e); + noc.abort(); + } + } + + @Override + public void close() { + + } + + @Override + public void error(int ecode) { + if (noc != null) { + noc.abort(); + } + } + } + + public MuxDemuxPerformanceCounters getPerformanceCounters() { + return md.getPerformanceCounters(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/Page.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/Page.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/Page.java new file mode 100644 index 0000000..47230a6 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/Page.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.result; + +import java.nio.ByteBuffer; + +public class Page { + private final ByteBuffer buffer; + + public Page(ByteBuffer buffer) { + this.buffer = buffer; + } + + public ByteBuffer getBuffer() { + return buffer; + } + + public ByteBuffer clear() { + return (ByteBuffer) buffer.clear(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultMemoryManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultMemoryManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultMemoryManager.java new file mode 100644 index 0000000..34c9eb5 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultMemoryManager.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.result; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.partitions.ResultSetPartitionId; + +public class ResultMemoryManager { + private int availableMemory; + + private final Set<Page> availPages; + + private final LeastRecentlyUsedList leastRecentlyUsedList; + + private final Map<ResultSetPartitionId, PartitionNode> resultPartitionNodesMap; + + private final static int FRAME_SIZE = 32768; + + public ResultMemoryManager(int availableMemory) { + this.availableMemory = availableMemory; + + availPages = new HashSet<Page>(); + + // Atleast have one page for temporarily storing the results. + if (this.availableMemory <= FRAME_SIZE) { + this.availableMemory = FRAME_SIZE; + } + + leastRecentlyUsedList = new LeastRecentlyUsedList(); + resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>(); + } + + public synchronized Page requestPage(ResultSetPartitionId resultSetPartitionId, ResultState resultState) + throws HyracksDataException { + Page page; + if (availPages.isEmpty()) { + if (availableMemory >= FRAME_SIZE) { + /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's + * allocateFrame() instead of direct ByteBuffer.allocate()? + */ + availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE))); + availableMemory -= FRAME_SIZE; + page = getAvailablePage(); + } else { + page = evictPage(); + } + } else { + page = getAvailablePage(); + } + + page.clear(); + + /* + * It is extremely important to update the reference after obtaining the page because, in the cases where + * memory manager is allocated only one page of memory, the front of the LRU list should not be created by the + * update reference call before a page is pushed on to the element of the LRU list. So we first obtain the + * page, then make a updateReference call which in turn creates a new node in the LRU list and then add the + * page to it. + */ + PartitionNode pn = updateReference(resultSetPartitionId, resultState); + pn.add(page); + return page; + } + + public void pageReferenced(ResultSetPartitionId resultSetPartitionId) { + // When a page is referenced the result partition writer should already be known, so we pass null. + updateReference(resultSetPartitionId, null); + } + + public static int getPageSize() { + return FRAME_SIZE; + } + + protected void insertPartitionNode(ResultSetPartitionId resultSetPartitionId, PartitionNode pn) { + leastRecentlyUsedList.add(pn); + resultPartitionNodesMap.put(resultSetPartitionId, pn); + } + + protected PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId, ResultState resultState) { + PartitionNode pn = null; + + if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) { + if (resultState != null) { + pn = new PartitionNode(resultSetPartitionId, resultState); + insertPartitionNode(resultSetPartitionId, pn); + } + return pn; + } + synchronized (this) { + pn = resultPartitionNodesMap.get(resultSetPartitionId); + leastRecentlyUsedList.remove(pn); + insertPartitionNode(resultSetPartitionId, pn); + } + + return pn; + } + + protected Page evictPage() throws HyracksDataException { + PartitionNode pn = leastRecentlyUsedList.getFirst(); + ResultState resultState = pn.getResultState(); + Page page = resultState.returnPage(); + + /* If the partition holding the pages breaks the contract by not returning the page or it has no page, just + * take away all the pages allocated to it and add to the available pages set. + */ + if (page == null) { + availPages.addAll(pn); + pn.clear(); + resultPartitionNodesMap.remove(pn.getResultSetPartitionId()); + leastRecentlyUsedList.remove(pn); + + /* Based on the assumption that if the result partition writer returned a null page, it should be lying + * about the number of pages it holds in which case we just evict all the pages it holds and should thus be + * able to add all those pages to available set and we have at least one page to allocate back. + */ + page = getAvailablePage(); + } else { + pn.remove(page); + + // If the partition no more holds any pages, remove it from the linked list and the hash map. + if (pn.isEmpty()) { + resultPartitionNodesMap.remove(pn.getResultSetPartitionId()); + leastRecentlyUsedList.remove(pn); + } + } + + return page; + } + + protected Page getAvailablePage() { + Iterator<Page> iter = availPages.iterator(); + Page page = iter.next(); + iter.remove(); + return page; + } + + private class LeastRecentlyUsedList { + private PartitionNode head; + + private PartitionNode tail; + + public LeastRecentlyUsedList() { + head = null; + tail = null; + } + + public void add(PartitionNode node) { + if (head == null) { + head = tail = node; + return; + } + tail.setNext(node); + node.setPrev(tail); + tail = node; + } + + public void remove(PartitionNode node) { + if ((node == head) && (node == tail)) { + head = tail = null; + return; + } else if (node == head) { + head = head.getNext(); + head.setPrev(null); + return; + } else if (node == tail) { + tail = tail.getPrev(); + tail.setNext(null); + return; + } else { + PartitionNode prev = node.getPrev(); + PartitionNode next = node.getNext(); + prev.setNext(next); + next.setPrev(prev); + } + } + + public PartitionNode getFirst() { + return head; + } + } + + private class PartitionNode extends HashSet<Page> { + private static final long serialVersionUID = 1L; + + private final ResultSetPartitionId resultSetPartitionId; + + private final ResultState resultState; + + private PartitionNode prev; + + private PartitionNode next; + + public PartitionNode(ResultSetPartitionId resultSetPartitionId, ResultState resultState) { + this.resultSetPartitionId = resultSetPartitionId; + this.resultState = resultState; + prev = null; + next = null; + } + + public ResultSetPartitionId getResultSetPartitionId() { + return resultSetPartitionId; + } + + public ResultState getResultState() { + return resultState; + } + + public void setPrev(PartitionNode node) { + prev = node; + } + + public PartitionNode getPrev() { + return prev; + } + + public void setNext(PartitionNode node) { + next = node; + } + + public PartitionNode getNext() { + return next; + } + } +}