http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 804bacd..169e5ea 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -46,7 +46,6 @@ import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.control.CcId;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -57,6 +56,7 @@ import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.api.util.InvokeUtil;
@@ -74,15 +74,15 @@ import 
org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.work.FutureValue;
 import org.apache.hyracks.control.common.work.WorkQueue;
 import org.apache.hyracks.control.nc.application.NCServiceContext;
-import org.apache.hyracks.control.nc.dataset.DatasetPartitionManager;
 import org.apache.hyracks.control.nc.heartbeat.HeartbeatComputeTask;
 import org.apache.hyracks.control.nc.heartbeat.HeartbeatTask;
 import org.apache.hyracks.control.nc.io.IOManager;
-import org.apache.hyracks.control.nc.net.DatasetNetworkManager;
 import org.apache.hyracks.control.nc.net.MessagingNetworkManager;
 import org.apache.hyracks.control.nc.net.NetworkManager;
+import org.apache.hyracks.control.nc.net.ResultNetworkManager;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
+import org.apache.hyracks.control.nc.result.ResultPartitionManager;
 import org.apache.hyracks.control.nc.work.AbortAllJobsWork;
 import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
 import org.apache.hyracks.ipc.api.IIPCEventListener;
@@ -118,9 +118,9 @@ public class NodeControllerService implements 
IControllerService {
 
     private NetworkManager netManager;
 
-    private IDatasetPartitionManager datasetPartitionManager;
+    private IResultPartitionManager resultPartitionManager;
 
-    private DatasetNetworkManager datasetNetworkManager;
+    private ResultNetworkManager resultNetworkManager;
 
     private final WorkQueue workQueue;
 
@@ -262,10 +262,10 @@ public class NodeControllerService implements 
IControllerService {
     }
 
     private void init() {
-        datasetPartitionManager = new DatasetPartitionManager(this, executor, 
ncConfig.getResultManagerMemory(),
+        resultPartitionManager = new ResultPartitionManager(this, executor, 
ncConfig.getResultManagerMemory(),
                 ncConfig.getResultTTL(), ncConfig.getResultSweepThreshold());
-        datasetNetworkManager = new 
DatasetNetworkManager(ncConfig.getResultListenAddress(),
-                ncConfig.getResultListenPort(), datasetPartitionManager, 
ncConfig.getNetThreadCount(),
+        resultNetworkManager = new 
ResultNetworkManager(ncConfig.getResultListenAddress(),
+                ncConfig.getResultListenPort(), resultPartitionManager, 
ncConfig.getNetThreadCount(),
                 ncConfig.getNetBufferCount(), 
ncConfig.getResultPublicAddress(), ncConfig.getResultPublicPort(),
                 FullFrameChannelInterfaceFactory.INSTANCE);
         if (ncConfig.getMessagingListenAddress() != null && 
serviceCtx.getMessagingChannelInterfaceFactory() != null) {
@@ -289,7 +289,7 @@ public class NodeControllerService implements 
IControllerService {
         netManager.start();
         startApplication();
         init();
-        datasetNetworkManager.start();
+        resultNetworkManager.start();
         if (messagingNetManager != null) {
             messagingNetManager.start();
         }
@@ -323,11 +323,11 @@ public class NodeControllerService implements 
IControllerService {
         }
         HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
 
-        NetworkAddress datasetAddress = 
datasetNetworkManager.getPublicNetworkAddress();
+        NetworkAddress resultAddress = 
resultNetworkManager.getPublicNetworkAddress();
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
         NetworkAddress messagingAddress =
                 messagingNetManager != null ? 
messagingNetManager.getPublicNetworkAddress() : null;
-        nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, 
netAddress, datasetAddress, hbSchema,
+        nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, 
netAddress, resultAddress, hbSchema,
                 messagingAddress, application.getCapacity());
 
         ncData = new NodeControllerData(nodeRegistration);
@@ -494,9 +494,9 @@ public class NodeControllerService implements 
IControllerService {
                 LOGGER.log(Level.ERROR, "Some jobs failed to exit, continuing 
with abnormal shutdown");
             }
             partitionManager.close();
-            datasetPartitionManager.close();
+            resultPartitionManager.close();
             netManager.stop();
-            datasetNetworkManager.stop();
+            resultNetworkManager.stop();
             if (messagingNetManager != null) {
                 messagingNetManager.stop();
             }
@@ -582,8 +582,8 @@ public class NodeControllerService implements 
IControllerService {
         return netManager;
     }
 
-    public DatasetNetworkManager getDatasetNetworkManager() {
-        return datasetNetworkManager;
+    public ResultNetworkManager getResultNetworkManager() {
+        return resultNetworkManager;
     }
 
     public PartitionManager getPartitionManager() {
@@ -645,8 +645,8 @@ public class NodeControllerService implements 
IControllerService {
         getClusterController(ccId).sendApplicationMessageToCC(data, 
deploymentId, id);
     }
 
-    public IDatasetPartitionManager getDatasetPartitionManager() {
-        return datasetPartitionManager;
+    public IResultPartitionManager getResultPartitionManager() {
+        return resultPartitionManager;
     }
 
     public MessagingNetworkManager getMessagingNetworkManager() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 340924d..f6531d7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -43,7 +43,6 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.state.IStateObject;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -57,6 +56,7 @@ import org.apache.hyracks.api.job.profiling.counters.ICounter;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.api.resources.IDeallocatable;
+import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.common.job.PartitionState;
@@ -427,8 +427,8 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
     }
 
     @Override
-    public IDatasetPartitionManager getDatasetPartitionManager() {
-        return ncs.getDatasetPartitionManager();
+    public IResultPartitionManager getResultPartitionManager() {
+        return ncs.getResultPartitionManager();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetMemoryManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetMemoryManager.java
deleted file mode 100644
index 37cddb8..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetMemoryManager.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.nc.dataset;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.partitions.ResultSetPartitionId;
-
-public class DatasetMemoryManager {
-    private int availableMemory;
-
-    private final Set<Page> availPages;
-
-    private final LeastRecentlyUsedList leastRecentlyUsedList;
-
-    private final Map<ResultSetPartitionId, PartitionNode> 
resultPartitionNodesMap;
-
-    private final static int FRAME_SIZE = 32768;
-
-    public DatasetMemoryManager(int availableMemory) {
-        this.availableMemory = availableMemory;
-
-        availPages = new HashSet<Page>();
-
-        // Atleast have one page for temporarily storing the results.
-        if (this.availableMemory <= FRAME_SIZE)
-            this.availableMemory = FRAME_SIZE;
-
-        leastRecentlyUsedList = new LeastRecentlyUsedList();
-        resultPartitionNodesMap = new HashMap<ResultSetPartitionId, 
PartitionNode>();
-    }
-
-    public synchronized Page requestPage(ResultSetPartitionId 
resultSetPartitionId, ResultState resultState)
-            throws HyracksDataException {
-        Page page;
-        if (availPages.isEmpty()) {
-            if (availableMemory >= FRAME_SIZE) {
-                /* TODO(madhusudancs): Should we have some way of accounting 
this memory usage by using Hyrack's allocateFrame()
-                 * instead of direct ByteBuffer.allocate()?
-                 */
-                availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
-                availableMemory -= FRAME_SIZE;
-                page = getAvailablePage();
-            } else {
-                page = evictPage();
-            }
-        } else {
-            page = getAvailablePage();
-        }
-
-        page.clear();
-
-        /*
-         * It is extremely important to update the reference after obtaining 
the page because, in the cases where memory
-         * manager is allocated only one page of memory, the front of the LRU 
list should not be created by the
-         * update reference call before a page is pushed on to the element of 
the LRU list. So we first obtain the page,
-         * then make a updateReference call which in turn creates a new node 
in the LRU list and then add the page to it.
-         */
-        PartitionNode pn = updateReference(resultSetPartitionId, resultState);
-        pn.add(page);
-        return page;
-    }
-
-    public void pageReferenced(ResultSetPartitionId resultSetPartitionId) {
-        // When a page is referenced the dataset partition writer should 
already be known, so we pass null.
-        updateReference(resultSetPartitionId, null);
-    }
-
-    public static int getPageSize() {
-        return FRAME_SIZE;
-    }
-
-    protected void insertPartitionNode(ResultSetPartitionId 
resultSetPartitionId, PartitionNode pn) {
-        leastRecentlyUsedList.add(pn);
-        resultPartitionNodesMap.put(resultSetPartitionId, pn);
-    }
-
-    protected PartitionNode updateReference(ResultSetPartitionId 
resultSetPartitionId, ResultState resultState) {
-        PartitionNode pn = null;
-
-        if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) {
-            if (resultState != null) {
-                pn = new PartitionNode(resultSetPartitionId, resultState);
-                insertPartitionNode(resultSetPartitionId, pn);
-            }
-            return pn;
-        }
-        synchronized (this) {
-            pn = resultPartitionNodesMap.get(resultSetPartitionId);
-            leastRecentlyUsedList.remove(pn);
-            insertPartitionNode(resultSetPartitionId, pn);
-        }
-
-        return pn;
-    }
-
-    protected Page evictPage() throws HyracksDataException {
-        PartitionNode pn = leastRecentlyUsedList.getFirst();
-        ResultState resultState = pn.getResultState();
-        Page page = resultState.returnPage();
-
-        /* If the partition holding the pages breaks the contract by not 
returning the page or it has no page, just take
-         * away all the pages allocated to it and add to the available pages 
set.
-         */
-        if (page == null) {
-            availPages.addAll(pn);
-            pn.clear();
-            resultPartitionNodesMap.remove(pn.getResultSetPartitionId());
-            leastRecentlyUsedList.remove(pn);
-
-            /* Based on the assumption that if the dataset partition writer 
returned a null page, it should be lying about
-             * the number of pages it holds in which case we just evict all 
the pages it holds and should thus be able to
-             * add all those pages to available set and we have at least one 
page to allocate back.
-             */
-            page = getAvailablePage();
-        } else {
-            pn.remove(page);
-
-            // If the partition no more holds any pages, remove it from the 
linked list and the hash map.
-            if (pn.isEmpty()) {
-                resultPartitionNodesMap.remove(pn.getResultSetPartitionId());
-                leastRecentlyUsedList.remove(pn);
-            }
-        }
-
-        return page;
-    }
-
-    protected Page getAvailablePage() {
-        Iterator<Page> iter = availPages.iterator();
-        Page page = iter.next();
-        iter.remove();
-        return page;
-    }
-
-    private class LeastRecentlyUsedList {
-        private PartitionNode head;
-
-        private PartitionNode tail;
-
-        public LeastRecentlyUsedList() {
-            head = null;
-            tail = null;
-        }
-
-        public void add(PartitionNode node) {
-            if (head == null) {
-                head = tail = node;
-                return;
-            }
-            tail.setNext(node);
-            node.setPrev(tail);
-            tail = node;
-        }
-
-        public void remove(PartitionNode node) {
-            if ((node == head) && (node == tail)) {
-                head = tail = null;
-                return;
-            } else if (node == head) {
-                head = head.getNext();
-                head.setPrev(null);
-                return;
-            } else if (node == tail) {
-                tail = tail.getPrev();
-                tail.setNext(null);
-                return;
-            } else {
-                PartitionNode prev = node.getPrev();
-                PartitionNode next = node.getNext();
-                prev.setNext(next);
-                next.setPrev(prev);
-            }
-        }
-
-        public PartitionNode getFirst() {
-            return head;
-        }
-    }
-
-    private class PartitionNode extends HashSet<Page> {
-        private static final long serialVersionUID = 1L;
-
-        private final ResultSetPartitionId resultSetPartitionId;
-
-        private final ResultState resultState;
-
-        private PartitionNode prev;
-
-        private PartitionNode next;
-
-        public PartitionNode(ResultSetPartitionId resultSetPartitionId, 
ResultState resultState) {
-            this.resultSetPartitionId = resultSetPartitionId;
-            this.resultState = resultState;
-            prev = null;
-            next = null;
-        }
-
-        public ResultSetPartitionId getResultSetPartitionId() {
-            return resultSetPartitionId;
-        }
-
-        public ResultState getResultState() {
-            return resultState;
-        }
-
-        public void setPrev(PartitionNode node) {
-            prev = node;
-        }
-
-        public PartitionNode getPrev() {
-            return prev;
-        }
-
-        public void setNext(PartitionNode node) {
-            next = node;
-        }
-
-        public PartitionNode getNext() {
-            return next;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
deleted file mode 100644
index b7cf9a4..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.nc.dataset;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.io.IWorkspaceFileFactory;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.common.dataset.AbstractDatasetManager;
-import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
-import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
-import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class DatasetPartitionManager extends AbstractDatasetManager implements 
IDatasetPartitionManager {
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    private final NodeControllerService ncs;
-
-    private final Executor executor;
-
-    private final Map<JobId, ResultSetMap> partitionResultStateMap;
-
-    private final DefaultDeallocatableRegistry deallocatableRegistry;
-
-    private final IWorkspaceFileFactory fileFactory;
-
-    private final DatasetMemoryManager datasetMemoryManager;
-
-    public DatasetPartitionManager(NodeControllerService ncs, Executor 
executor, int availableMemory, long resultTTL,
-            long resultSweepThreshold) {
-        super(resultTTL);
-        this.ncs = ncs;
-        this.executor = executor;
-        deallocatableRegistry = new DefaultDeallocatableRegistry();
-        fileFactory = new WorkspaceFileFactory(deallocatableRegistry, 
ncs.getIoManager());
-        if (availableMemory >= DatasetMemoryManager.getPageSize()) {
-            datasetMemoryManager = new DatasetMemoryManager(availableMemory);
-        } else {
-            datasetMemoryManager = null;
-        }
-        partitionResultStateMap = new HashMap<>();
-        executor.execute(new ResultStateSweeper(this, resultSweepThreshold, 
LOGGER));
-    }
-
-    @Override
-    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, 
ResultSetId rsId, boolean orderedResult,
-            boolean asyncMode, int partition, int nPartitions, long maxReads) {
-        DatasetPartitionWriter dpw;
-        JobId jobId = ctx.getJobletContext().getJobId();
-        synchronized (this) {
-            dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, 
asyncMode, orderedResult, partition, nPartitions,
-                    datasetMemoryManager, fileFactory, maxReads);
-            ResultSetMap rsIdMap = 
partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap());
-            ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, 
nPartitions);
-            resultStates[partition] = dpw.getResultState();
-        }
-        LOGGER.debug("Initialized partition writer: JobId: {}:partition: {}", 
jobId, partition);
-        return dpw;
-    }
-
-    @Override
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, 
int partition, int nPartitions,
-            boolean orderedResult, boolean emptyResult) throws 
HyracksException {
-        try {
-            // Be sure to send the *public* network address to the CC
-            
ncs.getClusterController(jobId.getCcId()).registerResultPartitionLocation(jobId,
 rsId, orderedResult,
-                    emptyResult, partition, nPartitions, 
ncs.getDatasetNetworkManager().getPublicNetworkAddress());
-        } catch (Exception e) {
-            throw HyracksException.create(e);
-        }
-    }
-
-    @Override
-    public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, 
int partition) throws HyracksException {
-        try {
-            LOGGER.debug("Reporting partition write completion: JobId: 
{}:ResultSetId: {}:partition: {}", jobId, rsId,
-                    partition);
-            
ncs.getClusterController(jobId.getCcId()).reportResultPartitionWriteCompletion(jobId,
 rsId, partition);
-        } catch (Exception e) {
-            throw HyracksException.create(e);
-        }
-    }
-
-    @Override
-    public void initializeDatasetPartitionReader(JobId jobId, ResultSetId 
resultSetId, int partition,
-            IFrameWriter writer) throws HyracksException {
-        ResultState resultState = getResultState(jobId, resultSetId, 
partition);
-        DatasetPartitionReader dpr = new DatasetPartitionReader(this, 
datasetMemoryManager, executor, resultState);
-        dpr.writeTo(writer);
-        LOGGER.debug("Initialized partition reader: JobId: {}:ResultSetId: 
{}:partition: {}", jobId, resultSetId,
-                partition);
-    }
-
-    private synchronized ResultState getResultState(JobId jobId, ResultSetId 
resultSetId, int partition)
-            throws HyracksException {
-        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
-        if (rsIdMap == null) {
-            throw new HyracksException("Unknown JobId " + jobId);
-        }
-        ResultState[] resultStates = rsIdMap.getResultStates(resultSetId);
-        if (resultStates == null) {
-            throw new HyracksException("Unknown JobId: " + jobId + " 
ResultSetId: " + resultSetId);
-        }
-        ResultState resultState = resultStates[partition];
-        if (resultState == null) {
-            throw new HyracksException("No DatasetPartitionWriter for 
partition " + partition);
-        }
-        return resultState;
-    }
-
-    @Override
-    public synchronized void removePartition(JobId jobId, ResultSetId 
resultSetId, int partition) {
-        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
-        if (rsIdMap != null && rsIdMap.removePartition(jobId, resultSetId, 
partition)) {
-            partitionResultStateMap.remove(jobId);
-        }
-    }
-
-    @Override
-    public synchronized void abortReader(JobId jobId) {
-        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
-        if (rsIdMap != null) {
-            rsIdMap.abortAll();
-        }
-    }
-
-    @Override
-    public synchronized void close() {
-        for (JobId jobId : getJobIds()) {
-            deinit(jobId);
-        }
-        deallocatableRegistry.close();
-    }
-
-    @Override
-    public synchronized Set<JobId> getJobIds() {
-        return partitionResultStateMap.keySet();
-    }
-
-    @Override
-    public synchronized ResultSetMap getState(JobId jobId) {
-        return partitionResultStateMap.get(jobId);
-    }
-
-    @Override
-    public synchronized void sweep(JobId jobId) {
-        deinit(jobId);
-        partitionResultStateMap.remove(jobId);
-    }
-
-    private synchronized void deinit(JobId jobId) {
-        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
-        if (rsIdMap != null) {
-            rsIdMap.closeAndDeleteAll();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
deleted file mode 100644
index 8c4fcb0..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.nc.dataset;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.partitions.ResultSetPartitionId;
-import org.apache.hyracks.comm.channels.NetworkOutputChannel;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class DatasetPartitionReader {
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    private final DatasetPartitionManager datasetPartitionManager;
-    private final DatasetMemoryManager datasetMemoryManager;
-    private final Executor executor;
-    private final ResultState resultState;
-
-    public DatasetPartitionReader(DatasetPartitionManager 
datasetPartitionManager,
-            DatasetMemoryManager datasetMemoryManager, Executor executor, 
ResultState resultState) {
-        this.datasetPartitionManager = datasetPartitionManager;
-        this.datasetMemoryManager = datasetMemoryManager;
-        this.executor = executor;
-        this.resultState = resultState;
-    }
-
-    public void writeTo(final IFrameWriter writer) {
-        executor.execute(new ResultPartitionSender((NetworkOutputChannel) 
writer));
-    }
-
-    private class ResultPartitionSender implements Runnable {
-
-        private final NetworkOutputChannel channel;
-
-        ResultPartitionSender(final NetworkOutputChannel channel) {
-            this.channel = channel;
-        }
-
-        @Override
-        public void run() {
-            channel.setFrameSize(resultState.getFrameSize());
-            channel.open();
-            try {
-                resultState.readOpen();
-                long offset = 0;
-                final ByteBuffer buffer = 
ByteBuffer.allocate(resultState.getFrameSize());
-                while (true) {
-                    buffer.clear();
-                    final long size = read(offset, buffer);
-                    if (size <= 0) {
-                        break;
-                    } else if (size < buffer.limit()) {
-                        throw new IllegalStateException(
-                                "Premature end of file - readSize: " + size + 
" buffer limit: " + buffer.limit());
-                    }
-                    offset += size;
-                    buffer.flip();
-                    channel.nextFrame(buffer);
-                }
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("result reading successful(" + 
resultState.getResultSetPartitionId() + ")");
-                }
-            } catch (Exception e) {
-                LOGGER.error(() -> "failed to send result partition " + 
resultState.getResultSetPartitionId(), e);
-                channel.abort();
-            } finally {
-                close();
-            }
-        }
-
-        private long read(long offset, ByteBuffer buffer) throws 
HyracksDataException {
-            return datasetMemoryManager != null ? 
resultState.read(datasetMemoryManager, offset, buffer)
-                    : resultState.read(offset, buffer);
-        }
-
-        private void close() {
-            try {
-                channel.close();
-                resultState.readClose();
-                if (resultState.isExhausted()) {
-                    final ResultSetPartitionId partitionId = 
resultState.getResultSetPartitionId();
-                    
datasetPartitionManager.removePartition(partitionId.getJobId(), 
partitionId.getResultSetId(),
-                            partitionId.getPartition());
-                }
-            } catch (HyracksDataException e) {
-                LOGGER.error("unexpected failure in partition reader clean 
up", e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
deleted file mode 100644
index b593bb5..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.nc.dataset;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.io.IWorkspaceFileFactory;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.partitions.ResultSetPartitionId;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class DatasetPartitionWriter implements IFrameWriter {
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    private final IDatasetPartitionManager manager;
-
-    private final JobId jobId;
-
-    private final ResultSetId resultSetId;
-
-    private final boolean orderedResult;
-
-    private final int partition;
-
-    private final int nPartitions;
-
-    private final DatasetMemoryManager datasetMemoryManager;
-
-    private final ResultSetPartitionId resultSetPartitionId;
-
-    private final ResultState resultState;
-
-    private boolean partitionRegistered;
-
-    private boolean failed = false;
-
-    public DatasetPartitionWriter(IHyracksTaskContext ctx, 
IDatasetPartitionManager manager, JobId jobId,
-            ResultSetId rsId, boolean asyncMode, boolean orderedResult, int 
partition, int nPartitions,
-            DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory 
fileFactory, long maxReads) {
-        this.manager = manager;
-        this.jobId = jobId;
-        this.resultSetId = rsId;
-        this.orderedResult = orderedResult;
-        this.partition = partition;
-        this.nPartitions = nPartitions;
-        this.datasetMemoryManager = datasetMemoryManager;
-
-        resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, 
partition);
-        resultState = new ResultState(resultSetPartitionId, asyncMode, 
ctx.getIoManager(), fileFactory,
-                ctx.getInitialFrameSize(), maxReads);
-    }
-
-    public ResultState getResultState() {
-        return resultState;
-    }
-
-    @Override
-    public void open() {
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("open(" + partition + ")");
-        }
-        partitionRegistered = false;
-        resultState.open();
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        registerResultPartitionLocation(false);
-        if (datasetMemoryManager == null) {
-            resultState.write(buffer);
-        } else {
-            resultState.write(datasetMemoryManager, buffer);
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        failed = true;
-        resultState.closeAndDelete();
-        resultState.abort();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("close(" + partition + ")");
-        }
-        try {
-            if (!failed) {
-                registerResultPartitionLocation(true);
-            }
-        } finally {
-            resultState.close();
-        }
-        try {
-            if (partitionRegistered) {
-                manager.reportPartitionWriteCompletion(jobId, resultSetId, 
partition);
-            }
-        } catch (HyracksException e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    void registerResultPartitionLocation(boolean empty) throws 
HyracksDataException {
-        try {
-            if (!partitionRegistered) {
-                manager.registerResultPartitionLocation(jobId, resultSetId, 
partition, nPartitions, orderedResult,
-                        empty);
-                partitionRegistered = true;
-            }
-        } catch (HyracksException e) {
-            if (e instanceof HyracksDataException) {
-                throw (HyracksDataException) e;
-            } else {
-                throw HyracksDataException.create(e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/Page.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/Page.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/Page.java
deleted file mode 100644
index 2eb33fd..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/Page.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.nc.dataset;
-
-import java.nio.ByteBuffer;
-
-public class Page {
-    private final ByteBuffer buffer;
-
-    public Page(ByteBuffer buffer) {
-        this.buffer = buffer;
-    }
-
-    public ByteBuffer getBuffer() {
-        return buffer;
-    }
-
-    public ByteBuffer clear() {
-        return (ByteBuffer) buffer.clear();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
deleted file mode 100644
index 1a64a5a..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.nc.dataset;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hyracks.api.dataset.IDatasetStateRecord;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-class ResultSetMap implements IDatasetStateRecord, Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    private final long timestamp;
-    private final HashMap<ResultSetId, ResultState[]> resultStateMap;
-
-    ResultSetMap() {
-        timestamp = System.nanoTime();
-        resultStateMap = new HashMap<>();
-    }
-
-    @Override
-    public long getTimestamp() {
-        return timestamp;
-    }
-
-    ResultState[] getResultStates(ResultSetId rsId) {
-        return resultStateMap.get(rsId);
-    }
-
-    ResultState[] createOrGetResultStates(ResultSetId rsId, int nPartitions) {
-        return resultStateMap.computeIfAbsent(rsId, (k) -> new 
ResultState[nPartitions]);
-    }
-
-    /**
-     * removes a result partition for a result set
-     *
-     * @param jobId
-     *            the id of the job that produced the result set
-     * @param resultSetId
-     *            the id of the result set
-     * @param partition
-     *            the partition number
-     * @return true, if all partitions for the resultSetId have been removed
-     */
-    boolean removePartition(JobId jobId, ResultSetId resultSetId, int 
partition) {
-        final ResultState[] resultStates = resultStateMap.get(resultSetId);
-        if (resultStates != null) {
-            final ResultState state = resultStates[partition];
-            if (state != null) {
-                state.closeAndDelete();
-                LOGGER.debug("Removing partition: " + partition + " for JobId: 
" + jobId);
-            }
-            resultStates[partition] = null;
-            boolean stateEmpty = true;
-            for (ResultState resState : resultStates) {
-                if (resState != null) {
-                    stateEmpty = false;
-                    break;
-                }
-            }
-            if (stateEmpty) {
-                resultStateMap.remove(resultSetId);
-            }
-            return resultStateMap.isEmpty();
-        }
-        return true;
-    }
-
-    void abortAll() {
-        applyToAllStates((rsId, state, i) -> state.abort());
-    }
-
-    void closeAndDeleteAll() {
-        applyToAllStates((rsId, state, i) -> {
-            state.closeAndDelete();
-            LOGGER.debug("Removing partition: " + i + " for result set " + 
rsId);
-        });
-    }
-
-    @FunctionalInterface
-    private interface StateModifier {
-        void modify(ResultSetId rsId, ResultState entry, int partition);
-    }
-
-    private void applyToAllStates(StateModifier modifier) {
-        for (Map.Entry<ResultSetId, ResultState[]> entry : 
resultStateMap.entrySet()) {
-            final ResultSetId rsId = entry.getKey();
-            final ResultState[] resultStates = entry.getValue();
-            if (resultStates == null) {
-                continue;
-            }
-            for (int i = 0; i < resultStates.length; i++) {
-                final ResultState state = resultStates[i];
-                if (state != null) {
-                    modifier.modify(rsId, state, i);
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
deleted file mode 100644
index 6b35912..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.nc.dataset;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.hyracks.api.dataflow.state.IStateObject;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IFileHandle;
-import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.io.IWorkspaceFileFactory;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.partitions.ResultSetPartitionId;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-public class ResultState implements IStateObject {
-    private static final String FILE_PREFIX = "result_";
-
-    private final ResultSetPartitionId resultSetPartitionId;
-
-    private final boolean asyncMode;
-
-    private final int frameSize;
-
-    private final IIOManager ioManager;
-
-    private final IWorkspaceFileFactory fileFactory;
-
-    private final AtomicBoolean eos;
-
-    private final AtomicBoolean failed;
-
-    private final List<Page> localPageList;
-
-    private FileReference fileRef;
-
-    private IFileHandle fileHandle;
-
-    private volatile int referenceCount = 0;
-
-    private long size;
-
-    private long persistentSize;
-    private long remainingReads;
-
-    ResultState(ResultSetPartitionId resultSetPartitionId, boolean asyncMode, 
IIOManager ioManager,
-            IWorkspaceFileFactory fileFactory, int frameSize, long maxReads) {
-        if (maxReads <= 0) {
-            throw new IllegalArgumentException("maxReads must be > 0");
-        }
-        this.resultSetPartitionId = resultSetPartitionId;
-        this.asyncMode = asyncMode;
-        this.ioManager = ioManager;
-        this.fileFactory = fileFactory;
-        this.frameSize = frameSize;
-        remainingReads = maxReads;
-        eos = new AtomicBoolean(false);
-        failed = new AtomicBoolean(false);
-        localPageList = new ArrayList<>();
-
-        fileRef = null;
-        fileHandle = null;
-    }
-
-    public synchronized void open() {
-        size = 0;
-        persistentSize = 0;
-        referenceCount = 0;
-    }
-
-    public synchronized void close() {
-        eos.set(true);
-        closeWriteFileHandle();
-        notifyAll();
-    }
-
-    public synchronized void closeAndDelete() {
-        // Deleting a job is equivalent to aborting the job for all practical 
purposes, so the same action, needs
-        // to be taken when there are more requests to these result states.
-        failed.set(true);
-        closeWriteFileHandle();
-        if (fileRef != null) {
-            fileRef.delete();
-            fileRef = null;
-        }
-    }
-
-    private void closeWriteFileHandle() {
-        if (fileHandle != null) {
-            doCloseFileHandle();
-        }
-    }
-
-    private void doCloseFileHandle() {
-        if (--referenceCount == 0) {
-            // close the file if there is no more reference
-            try {
-                ioManager.close(fileHandle);
-            } catch (IOException e) {
-                // Since file handle could not be closed, just ignore.
-            }
-            fileHandle = null;
-        }
-    }
-
-    public synchronized void write(ByteBuffer buffer) throws 
HyracksDataException {
-        if (fileRef == null) {
-            initWriteFileHandle();
-        }
-
-        size += ioManager.syncWrite(fileHandle, size, buffer);
-        notifyAll();
-    }
-
-    public synchronized void write(DatasetMemoryManager datasetMemoryManager, 
ByteBuffer buffer)
-            throws HyracksDataException {
-        int srcOffset = 0;
-        Page destPage = null;
-
-        if (!localPageList.isEmpty()) {
-            destPage = localPageList.get(localPageList.size() - 1);
-        }
-
-        while (srcOffset < buffer.limit()) {
-            if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) 
{
-                destPage = 
datasetMemoryManager.requestPage(resultSetPartitionId, this);
-                localPageList.add(destPage);
-            }
-            int srcLength = Math.min(buffer.limit() - srcOffset, 
destPage.getBuffer().remaining());
-            destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
-            srcOffset += srcLength;
-            size += srcLength;
-        }
-
-        notifyAll();
-    }
-
-    public synchronized void readOpen() {
-        if (isExhausted()) {
-            throw new IllegalStateException("Result reads exhausted");
-        }
-        remainingReads--;
-    }
-
-    public synchronized void readClose() throws HyracksDataException {
-        if (fileHandle != null) {
-            doCloseFileHandle();
-        }
-    }
-
-    public synchronized long read(long offset, ByteBuffer buffer) throws 
HyracksDataException {
-        long readSize = 0;
-
-        while (offset >= size && !eos.get() && !failed.get()) {
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                throw HyracksDataException.create(e);
-            }
-        }
-        if ((offset >= size && eos.get()) || failed.get()) {
-            return readSize;
-        }
-
-        if (fileHandle == null) {
-            initReadFileHandle();
-        }
-        readSize = ioManager.syncRead(fileHandle, offset, buffer);
-
-        return readSize;
-    }
-
-    public synchronized long read(DatasetMemoryManager datasetMemoryManager, 
long offset, ByteBuffer buffer)
-            throws HyracksDataException {
-        long readSize = 0;
-        while (offset >= size && !eos.get() && !failed.get()) {
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                throw HyracksDataException.create(e);
-            }
-        }
-
-        if ((offset >= size && eos.get()) || failed.get()) {
-            return readSize;
-        }
-
-        if (offset < persistentSize) {
-            if (fileHandle == null) {
-                initReadFileHandle();
-            }
-            readSize = ioManager.syncRead(fileHandle, offset, buffer);
-            if (readSize < 0) {
-                throw new HyracksDataException("Premature end of file");
-            }
-        }
-
-        if (readSize < buffer.capacity()) {
-            long localPageOffset = offset - persistentSize;
-            int localPageIndex = (int) (localPageOffset / 
DatasetMemoryManager.getPageSize());
-            int pageOffset = (int) (localPageOffset % 
DatasetMemoryManager.getPageSize());
-            Page page = getPage(localPageIndex);
-            if (page == null) {
-                return readSize;
-            }
-            readSize += buffer.remaining();
-            buffer.put(page.getBuffer().array(), pageOffset, 
buffer.remaining());
-        }
-        datasetMemoryManager.pageReferenced(resultSetPartitionId);
-        return readSize;
-    }
-
-    public synchronized void abort() {
-        failed.set(true);
-        notifyAll();
-    }
-
-    public synchronized Page returnPage() throws HyracksDataException {
-        Page page = removePage();
-
-        // If we do not have any pages to be given back close the write 
channel since we don't write any more, return null.
-        if (page == null) {
-            ioManager.close(fileHandle);
-            return null;
-        }
-
-        page.getBuffer().flip();
-
-        if (fileRef == null) {
-            initWriteFileHandle();
-        }
-
-        long delta = ioManager.syncWrite(fileHandle, persistentSize, 
page.getBuffer());
-        persistentSize += delta;
-        return page;
-    }
-
-    public synchronized void setEOS(boolean eos) {
-        this.eos.set(eos);
-    }
-
-    public ResultSetPartitionId getResultSetPartitionId() {
-        return resultSetPartitionId;
-    }
-
-    public int getFrameSize() {
-        return frameSize;
-    }
-
-    public IIOManager getIOManager() {
-        return ioManager;
-    }
-
-    public boolean getAsyncMode() {
-        return asyncMode;
-    }
-
-    @Override
-    public JobId getJobId() {
-        return resultSetPartitionId.getJobId();
-    }
-
-    @Override
-    public Object getId() {
-        return resultSetPartitionId;
-    }
-
-    @Override
-    public long getMemoryOccupancy() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void toBytes(DataOutput out) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void fromBytes(DataInput in) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    private Page getPage(int index) {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.get(index);
-        }
-        return page;
-    }
-
-    private Page removePage() {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.remove(localPageList.size() - 1);
-        }
-        return page;
-    }
-
-    private void initWriteFileHandle() throws HyracksDataException {
-        if (fileHandle == null) {
-            String fName = FILE_PREFIX + 
String.valueOf(resultSetPartitionId.getPartition());
-            fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
-            fileHandle = ioManager.open(fileRef, 
IIOManager.FileReadWriteMode.READ_WRITE,
-                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-            if (referenceCount != 0) {
-                throw new IllegalStateException("Illegal reference count " + 
referenceCount);
-            }
-            referenceCount = 1;
-            notifyAll(); // NOSONAR: always called from a synchronized block
-        }
-    }
-
-    private void initReadFileHandle() throws HyracksDataException {
-        while (fileRef == null && !failed.get()) {
-            // wait for writer to create the file
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                throw HyracksDataException.create(e);
-            }
-        }
-        if (failed.get()) {
-            return;
-        }
-        if (fileHandle == null) {
-            // fileHandle has been closed by the writer, create it again
-            fileHandle = ioManager.open(fileRef, 
IIOManager.FileReadWriteMode.READ_ONLY,
-                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-        }
-        referenceCount++;
-    }
-
-    @Override
-    public String toString() {
-        try {
-            ObjectMapper om = new ObjectMapper();
-            ObjectNode on = om.createObjectNode();
-            on.put("rspid", resultSetPartitionId.toString());
-            on.put("async", asyncMode);
-            on.put("remainingReads", remainingReads);
-            on.put("eos", eos.get());
-            on.put("failed", failed.get());
-            on.put("fileRef", String.valueOf(fileRef));
-            return om.writer(new 
MinimalPrettyPrinter()).writeValueAsString(on);
-        } catch (JsonProcessingException e) { // NOSONAR
-            return e.getMessage();
-        }
-    }
-
-    public synchronized boolean isExhausted() {
-        return remainingReads == 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
index dfa5ff3..9bfd061 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
@@ -85,11 +85,11 @@ public class HeartbeatComputeTask extends TimerTask {
             hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
             hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
 
-            MuxDemuxPerformanceCounters datasetNetPC = 
ncs.getDatasetNetworkManager().getPerformanceCounters();
-            hbData.datasetNetPayloadBytesRead = 
datasetNetPC.getPayloadBytesRead();
-            hbData.datasetNetPayloadBytesWritten = 
datasetNetPC.getPayloadBytesWritten();
-            hbData.datasetNetSignalingBytesRead = 
datasetNetPC.getSignalingBytesRead();
-            hbData.datasetNetSignalingBytesWritten = 
datasetNetPC.getSignalingBytesWritten();
+            MuxDemuxPerformanceCounters resultNetPC = 
ncs.getResultNetworkManager().getPerformanceCounters();
+            hbData.resultNetPayloadBytesRead = 
resultNetPC.getPayloadBytesRead();
+            hbData.resultNetPayloadBytesWritten = 
resultNetPC.getPayloadBytesWritten();
+            hbData.resultNetSignalingBytesRead = 
resultNetPC.getSignalingBytesRead();
+            hbData.resultNetSignalingBytesWritten = 
resultNetPC.getSignalingBytesWritten();
 
             IPCPerformanceCounters ipcPC = 
ncs.getIpcSystem().getPerformanceCounters();
             hbData.ipcMessagesSent = ipcPC.getMessageSentCount();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
deleted file mode 100644
index 5eba281..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.nc.net;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
-import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
-import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.exceptions.NetException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
-import org.apache.hyracks.comm.channels.NetworkOutputChannel;
-import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
-import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
-import org.apache.hyracks.net.protocols.muxdemux.MuxDemux;
-import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class DatasetNetworkManager implements IChannelConnectionFactory {
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    private static final int MAX_CONNECTION_ATTEMPTS = 5;
-
-    static final int INITIAL_MESSAGE_SIZE = 20;
-
-    private final IDatasetPartitionManager partitionManager;
-
-    private final MuxDemux md;
-
-    private final int nBuffers;
-
-    private NetworkAddress localNetworkAddress;
-
-    private NetworkAddress publicNetworkAddress;
-
-    /**
-     * @param inetAddress
-     *            - Internet address to bind the listen port to
-     * @param inetPort
-     *            - Port to bind on inetAddress
-     * @param publicInetAddress
-     *            - Internet address to report to consumers;
-     *            useful when behind NAT. null = same as inetAddress
-     * @param publicInetPort
-     *            - Port to report to consumers; useful when
-     *            behind NAT. Ignored if publicInetAddress is null. 0 = same 
as inetPort
-     */
-    public DatasetNetworkManager(String inetAddress, int inetPort, 
IDatasetPartitionManager partitionManager,
-            int nThreads, int nBuffers, String publicInetAddress, int 
publicInetPort,
-            IChannelInterfaceFactory channelInterfaceFactory) {
-        this.partitionManager = partitionManager;
-        this.nBuffers = nBuffers;
-        md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new 
ChannelOpenListener(), nThreads,
-                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
-        // Just save these values for the moment; may be reset in start()
-        publicNetworkAddress = new NetworkAddress(publicInetAddress, 
publicInetPort);
-    }
-
-    public void start() throws IOException {
-        md.start();
-        InetSocketAddress sockAddr = md.getLocalAddress();
-        localNetworkAddress = new NetworkAddress(sockAddr.getHostString(), 
sockAddr.getPort());
-
-        // See if the public address was explicitly specified, and if not,
-        // make it a copy of localNetworkAddress
-        if (publicNetworkAddress.getAddress() == null) {
-            publicNetworkAddress = localNetworkAddress;
-        } else {
-            // Likewise for public port
-            if (publicNetworkAddress.getPort() == 0) {
-                publicNetworkAddress = new 
NetworkAddress(publicNetworkAddress.getAddress(), sockAddr.getPort());
-            }
-        }
-    }
-
-    public NetworkAddress getLocalNetworkAddress() {
-        return localNetworkAddress;
-    }
-
-    public NetworkAddress getPublicNetworkAddress() {
-        return publicNetworkAddress;
-    }
-
-    public void stop() {
-
-    }
-
-    public ChannelControlBlock connect(SocketAddress remoteAddress) throws 
InterruptedException, NetException {
-        MultiplexedConnection mConn = md.connect((InetSocketAddress) 
remoteAddress);
-        return mConn.openChannel();
-    }
-
-    private class ChannelOpenListener implements IChannelOpenListener {
-        @Override
-        public void channelOpened(ChannelControlBlock channel) {
-            channel.getReadInterface().setFullBufferAcceptor(new 
InitialBufferAcceptor(channel));
-            
channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
-        }
-    }
-
-    private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
-        private final ChannelControlBlock ccb;
-
-        private NetworkOutputChannel noc;
-
-        public InitialBufferAcceptor(ChannelControlBlock ccb) {
-            this.ccb = ccb;
-        }
-
-        @Override
-        public void accept(ByteBuffer buffer) {
-            JobId jobId = new JobId(buffer.getLong());
-            ResultSetId rsId = new ResultSetId(buffer.getLong());
-            int partition = buffer.getInt();
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Received initial dataset partition read request 
for JobId: " + jobId + " partition: "
-                        + partition + " on channel: " + ccb);
-            }
-            noc = new NetworkOutputChannel(ccb, nBuffers);
-            try {
-                partitionManager.initializeDatasetPartitionReader(jobId, rsId, 
partition, noc);
-            } catch (HyracksException e) {
-                noc.abort();
-            }
-        }
-
-        @Override
-        public void close() {
-
-        }
-
-        @Override
-        public void error(int ecode) {
-            if (noc != null) {
-                noc.abort();
-            }
-        }
-    }
-
-    public MuxDemuxPerformanceCounters getPerformanceCounters() {
-        return md.getPerformanceCounters();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
new file mode 100644
index 0000000..e56bfe6
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultPartitionManager;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
+import org.apache.hyracks.comm.channels.NetworkOutputChannel;
+import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import org.apache.hyracks.net.protocols.muxdemux.MuxDemux;
+import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ResultNetworkManager implements IChannelConnectionFactory {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private static final int MAX_CONNECTION_ATTEMPTS = 5;
+
+    static final int INITIAL_MESSAGE_SIZE = 20;
+
+    private final IResultPartitionManager partitionManager;
+
+    private final MuxDemux md;
+
+    private final int nBuffers;
+
+    private NetworkAddress localNetworkAddress;
+
+    private NetworkAddress publicNetworkAddress;
+
+    /**
+     * @param inetAddress
+     *            - Internet address to bind the listen port to
+     * @param inetPort
+     *            - Port to bind on inetAddress
+     * @param publicInetAddress
+     *            - Internet address to report to consumers;
+     *            useful when behind NAT. null = same as inetAddress
+     * @param publicInetPort
+     *            - Port to report to consumers; useful when
+     *            behind NAT. Ignored if publicInetAddress is null. 0 = same 
as inetPort
+     */
+    public ResultNetworkManager(String inetAddress, int inetPort, 
IResultPartitionManager partitionManager,
+            int nThreads, int nBuffers, String publicInetAddress, int 
publicInetPort,
+            IChannelInterfaceFactory channelInterfaceFactory) {
+        this.partitionManager = partitionManager;
+        this.nBuffers = nBuffers;
+        md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new 
ChannelOpenListener(), nThreads,
+                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+        // Just save these values for the moment; may be reset in start()
+        publicNetworkAddress = new NetworkAddress(publicInetAddress, 
publicInetPort);
+    }
+
+    public void start() throws IOException {
+        md.start();
+        InetSocketAddress sockAddr = md.getLocalAddress();
+        localNetworkAddress = new NetworkAddress(sockAddr.getHostString(), 
sockAddr.getPort());
+
+        // See if the public address was explicitly specified, and if not,
+        // make it a copy of localNetworkAddress
+        if (publicNetworkAddress.getAddress() == null) {
+            publicNetworkAddress = localNetworkAddress;
+        } else {
+            // Likewise for public port
+            if (publicNetworkAddress.getPort() == 0) {
+                publicNetworkAddress = new 
NetworkAddress(publicNetworkAddress.getAddress(), sockAddr.getPort());
+            }
+        }
+    }
+
+    public NetworkAddress getLocalNetworkAddress() {
+        return localNetworkAddress;
+    }
+
+    public NetworkAddress getPublicNetworkAddress() {
+        return publicNetworkAddress;
+    }
+
+    public void stop() {
+
+    }
+
+    public ChannelControlBlock connect(SocketAddress remoteAddress) throws 
InterruptedException, NetException {
+        MultiplexedConnection mConn = md.connect((InetSocketAddress) 
remoteAddress);
+        return mConn.openChannel();
+    }
+
+    private class ChannelOpenListener implements IChannelOpenListener {
+        @Override
+        public void channelOpened(ChannelControlBlock channel) {
+            channel.getReadInterface().setFullBufferAcceptor(new 
InitialBufferAcceptor(channel));
+            
channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+        }
+    }
+
+    private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+        private final ChannelControlBlock ccb;
+
+        private NetworkOutputChannel noc;
+
+        public InitialBufferAcceptor(ChannelControlBlock ccb) {
+            this.ccb = ccb;
+        }
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            JobId jobId = new JobId(buffer.getLong());
+            ResultSetId rsId = new ResultSetId(buffer.getLong());
+            int partition = buffer.getInt();
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Received initial result partition read request 
for JobId: " + jobId + " partition: "
+                        + partition + " on channel: " + ccb);
+            }
+            noc = new NetworkOutputChannel(ccb, nBuffers);
+            try {
+                partitionManager.initializeResultPartitionReader(jobId, rsId, 
partition, noc);
+            } catch (HyracksException e) {
+                LOGGER.warn("Failed to initialize result partition reader", e);
+                noc.abort();
+            }
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public void error(int ecode) {
+            if (noc != null) {
+                noc.abort();
+            }
+        }
+    }
+
+    public MuxDemuxPerformanceCounters getPerformanceCounters() {
+        return md.getPerformanceCounters();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/Page.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/Page.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/Page.java
new file mode 100644
index 0000000..47230a6
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/Page.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.result;
+
+import java.nio.ByteBuffer;
+
+public class Page {
+    private final ByteBuffer buffer;
+
+    public Page(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+    public ByteBuffer clear() {
+        return (ByteBuffer) buffer.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultMemoryManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultMemoryManager.java
new file mode 100644
index 0000000..34c9eb5
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultMemoryManager.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.result;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.partitions.ResultSetPartitionId;
+
+public class ResultMemoryManager {
+    private int availableMemory;
+
+    private final Set<Page> availPages;
+
+    private final LeastRecentlyUsedList leastRecentlyUsedList;
+
+    private final Map<ResultSetPartitionId, PartitionNode> 
resultPartitionNodesMap;
+
+    private final static int FRAME_SIZE = 32768;
+
+    public ResultMemoryManager(int availableMemory) {
+        this.availableMemory = availableMemory;
+
+        availPages = new HashSet<Page>();
+
+        // Atleast have one page for temporarily storing the results.
+        if (this.availableMemory <= FRAME_SIZE) {
+            this.availableMemory = FRAME_SIZE;
+        }
+
+        leastRecentlyUsedList = new LeastRecentlyUsedList();
+        resultPartitionNodesMap = new HashMap<ResultSetPartitionId, 
PartitionNode>();
+    }
+
+    public synchronized Page requestPage(ResultSetPartitionId 
resultSetPartitionId, ResultState resultState)
+            throws HyracksDataException {
+        Page page;
+        if (availPages.isEmpty()) {
+            if (availableMemory >= FRAME_SIZE) {
+                /* TODO(madhusudancs): Should we have some way of accounting 
this memory usage by using Hyrack's
+                 * allocateFrame() instead of direct ByteBuffer.allocate()?
+                 */
+                availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
+                availableMemory -= FRAME_SIZE;
+                page = getAvailablePage();
+            } else {
+                page = evictPage();
+            }
+        } else {
+            page = getAvailablePage();
+        }
+
+        page.clear();
+
+        /*
+         * It is extremely important to update the reference after obtaining 
the page because, in the cases where
+         * memory manager is allocated only one page of memory, the front of 
the LRU list should not be created by the
+         * update reference call before a page is pushed on to the element of 
the LRU list. So we first obtain the
+         * page, then make a updateReference call which in turn creates a new 
node in the LRU list and then add the
+         * page to it.
+         */
+        PartitionNode pn = updateReference(resultSetPartitionId, resultState);
+        pn.add(page);
+        return page;
+    }
+
+    public void pageReferenced(ResultSetPartitionId resultSetPartitionId) {
+        // When a page is referenced the result partition writer should 
already be known, so we pass null.
+        updateReference(resultSetPartitionId, null);
+    }
+
+    public static int getPageSize() {
+        return FRAME_SIZE;
+    }
+
+    protected void insertPartitionNode(ResultSetPartitionId 
resultSetPartitionId, PartitionNode pn) {
+        leastRecentlyUsedList.add(pn);
+        resultPartitionNodesMap.put(resultSetPartitionId, pn);
+    }
+
+    protected PartitionNode updateReference(ResultSetPartitionId 
resultSetPartitionId, ResultState resultState) {
+        PartitionNode pn = null;
+
+        if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) {
+            if (resultState != null) {
+                pn = new PartitionNode(resultSetPartitionId, resultState);
+                insertPartitionNode(resultSetPartitionId, pn);
+            }
+            return pn;
+        }
+        synchronized (this) {
+            pn = resultPartitionNodesMap.get(resultSetPartitionId);
+            leastRecentlyUsedList.remove(pn);
+            insertPartitionNode(resultSetPartitionId, pn);
+        }
+
+        return pn;
+    }
+
+    protected Page evictPage() throws HyracksDataException {
+        PartitionNode pn = leastRecentlyUsedList.getFirst();
+        ResultState resultState = pn.getResultState();
+        Page page = resultState.returnPage();
+
+        /* If the partition holding the pages breaks the contract by not 
returning the page or it has no page, just
+         * take away all the pages allocated to it and add to the available 
pages set.
+         */
+        if (page == null) {
+            availPages.addAll(pn);
+            pn.clear();
+            resultPartitionNodesMap.remove(pn.getResultSetPartitionId());
+            leastRecentlyUsedList.remove(pn);
+
+            /* Based on the assumption that if the result partition writer 
returned a null page, it should be lying
+             * about the number of pages it holds in which case we just evict 
all the pages it holds and should thus be
+             * able to add all those pages to available set and we have at 
least one page to allocate back.
+             */
+            page = getAvailablePage();
+        } else {
+            pn.remove(page);
+
+            // If the partition no more holds any pages, remove it from the 
linked list and the hash map.
+            if (pn.isEmpty()) {
+                resultPartitionNodesMap.remove(pn.getResultSetPartitionId());
+                leastRecentlyUsedList.remove(pn);
+            }
+        }
+
+        return page;
+    }
+
+    protected Page getAvailablePage() {
+        Iterator<Page> iter = availPages.iterator();
+        Page page = iter.next();
+        iter.remove();
+        return page;
+    }
+
+    private class LeastRecentlyUsedList {
+        private PartitionNode head;
+
+        private PartitionNode tail;
+
+        public LeastRecentlyUsedList() {
+            head = null;
+            tail = null;
+        }
+
+        public void add(PartitionNode node) {
+            if (head == null) {
+                head = tail = node;
+                return;
+            }
+            tail.setNext(node);
+            node.setPrev(tail);
+            tail = node;
+        }
+
+        public void remove(PartitionNode node) {
+            if ((node == head) && (node == tail)) {
+                head = tail = null;
+                return;
+            } else if (node == head) {
+                head = head.getNext();
+                head.setPrev(null);
+                return;
+            } else if (node == tail) {
+                tail = tail.getPrev();
+                tail.setNext(null);
+                return;
+            } else {
+                PartitionNode prev = node.getPrev();
+                PartitionNode next = node.getNext();
+                prev.setNext(next);
+                next.setPrev(prev);
+            }
+        }
+
+        public PartitionNode getFirst() {
+            return head;
+        }
+    }
+
+    private class PartitionNode extends HashSet<Page> {
+        private static final long serialVersionUID = 1L;
+
+        private final ResultSetPartitionId resultSetPartitionId;
+
+        private final ResultState resultState;
+
+        private PartitionNode prev;
+
+        private PartitionNode next;
+
+        public PartitionNode(ResultSetPartitionId resultSetPartitionId, 
ResultState resultState) {
+            this.resultSetPartitionId = resultSetPartitionId;
+            this.resultState = resultState;
+            prev = null;
+            next = null;
+        }
+
+        public ResultSetPartitionId getResultSetPartitionId() {
+            return resultSetPartitionId;
+        }
+
+        public ResultState getResultState() {
+            return resultState;
+        }
+
+        public void setPrev(PartitionNode node) {
+            prev = node;
+        }
+
+        public PartitionNode getPrev() {
+            return prev;
+        }
+
+        public void setNext(PartitionNode node) {
+            next = node;
+        }
+
+        public PartitionNode getNext() {
+            return next;
+        }
+    }
+}

Reply via email to