http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
deleted file mode 100644
index 8e9e3dc..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.api.dataset;
-
-import java.util.Arrays;
-
-public class ResultSetMetaData {
-    private final DatasetDirectoryRecord[] records;
-    private final boolean ordered;
-
-    ResultSetMetaData(int len, boolean ordered) {
-        this.records = new DatasetDirectoryRecord[len];
-        this.ordered = ordered;
-    }
-
-    public boolean getOrderedResult() {
-        return ordered;
-    }
-
-    public DatasetDirectoryRecord[] getRecords() {
-        return records;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("{ordered: ").append(ordered).append(", records: 
").append(Arrays.toString(records));
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index c4c7320..b0a8017 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -40,7 +40,7 @@ import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import 
org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.api.job.resource.ClusterCapacity;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java
index 8a3e15a..8343fe0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/partitions/ResultSetPartitionId.java
@@ -20,7 +20,7 @@ package org.apache.hyracks.api.partitions;
 
 import java.io.Serializable;
 
-import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.api.job.JobId;
 
 public final class ResultSetPartitionId implements Serializable {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
new file mode 100644
index 0000000..41b9d1a
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
+import org.apache.hyracks.api.job.JobId;
+
+public interface IResultDirectory {
+    /**
+     * Gets the result status for the given result set.
+     *
+     * @param jobId
+     *            ID of the job
+     * @param rsId
+     *            ID of the result set
+     * @return {@link Status}
+     * @throws Exception
+     */
+    Status getResultStatus(JobId jobId, ResultSetId rsId) throws Exception;
+
+    /**
+     * Gets the IP Addresses and ports for the partition generating the result 
for each location.
+     *
+     * @param jobId
+     *            ID of the job
+     * @param rsId
+     *            ID of the result set
+     * @param knownRecords
+     *            Locations that are already known to the client
+     * @return {@link ResultDirectoryRecord[]}
+     * @throws Exception
+     */
+    ResultDirectoryRecord[] getResultLocations(JobId jobId, ResultSetId rsId, 
ResultDirectoryRecord[] knownRecords)
+            throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultManager.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultManager.java
new file mode 100644
index 0000000..d45cf44
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultManager.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+import java.util.Set;
+
+import org.apache.hyracks.api.job.JobId;
+
+public interface IResultManager {
+
+    Set<JobId> getJobIds();
+
+    IResultStateRecord getState(JobId jobId);
+
+    void sweep(JobId jobId);
+
+    /**
+     * Removes all references and deletes persisted files for
+     * all expired results.
+     */
+    void sweepExpiredResultSets();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
new file mode 100644
index 0000000..a539d37
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+
+public interface IResultPartitionManager extends IResultManager {
+    IFrameWriter createResultPartitionWriter(IHyracksTaskContext ctx, 
ResultSetId rsId, boolean orderedResult,
+            boolean asyncMode, int partition, int nPartitions, long maxReads) 
throws HyracksException;
+
+    void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int 
partition, int nPartitions,
+            boolean orderedResult, boolean emptyResult) throws 
HyracksException;
+
+    void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, 
int partition) throws HyracksException;
+
+    void initializeResultPartitionReader(JobId jobId, ResultSetId resultSetId, 
int partition, IFrameWriter noc)
+            throws HyracksException;
+
+    void removePartition(JobId jobId, ResultSetId resultSetId, int partition);
+
+    void abortReader(JobId jobId);
+
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSet.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSet.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSet.java
new file mode 100644
index 0000000..ff2e48a
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSet.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+
+public interface IResultSet {
+    IResultSetReader createReader(JobId jobId, ResultSetId resultSetId) throws 
HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java
new file mode 100644
index 0000000..0884c53
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultSetReader.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IResultSetReader {
+    Status getResultStatus();
+
+    int read(IFrame frame) throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
new file mode 100644
index 0000000..fe6bc15
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+public interface IResultStateRecord {
+    long getTimestamp();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
new file mode 100644
index 0000000..71792be
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.comm.NetworkAddress;
+
+public class ResultDirectoryRecord implements Serializable {
+    public enum Status {
+        IDLE,
+        RUNNING,
+        SUCCESS,
+        FAILED
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    private NetworkAddress address;
+
+    private boolean readEOS;
+
+    private Status status;
+
+    private boolean empty;
+
+    public ResultDirectoryRecord() {
+        this.address = null;
+        this.readEOS = false;
+        this.status = Status.IDLE;
+    }
+
+    public void setNetworkAddress(NetworkAddress address) {
+        this.address = address;
+    }
+
+    public NetworkAddress getNetworkAddress() {
+        return address;
+    }
+
+    public void setEmpty(boolean empty) {
+        this.empty = empty;
+    }
+
+    public boolean isEmpty() {
+        return empty;
+    }
+
+    public void readEOS() {
+        this.readEOS = true;
+    }
+
+    public boolean hasReachedReadEOS() {
+        return readEOS;
+    }
+
+    public void start() {
+        updateStatus(Status.RUNNING);
+    }
+
+    public void writeEOS() {
+        updateStatus(Status.SUCCESS);
+    }
+
+    public void fail() {
+        status = Status.FAILED;
+    }
+
+    private void updateStatus(final ResultDirectoryRecord.Status newStatus) {
+        // FAILED is a stable status
+        if (status != Status.FAILED) {
+            status = newStatus;
+        }
+    }
+
+    public Status getStatus() {
+        return status;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof ResultDirectoryRecord)) {
+            return false;
+        }
+        return address.equals(((ResultDirectoryRecord) o).address);
+    }
+
+    @Override
+    public String toString() {
+        return String.valueOf(address) + " " + status + (empty ? " (empty)" : 
"") + (readEOS ? " (EOS)" : "");
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
new file mode 100644
index 0000000..b8ddbd2
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ResultJobRecord implements IResultStateRecord {
+    public enum State {
+        IDLE,
+        RUNNING,
+        SUCCESS,
+        FAILED
+    }
+
+    public static class Status implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        State state = State.IDLE;
+
+        private List<Exception> exceptions;
+
+        public State getState() {
+            return state;
+        }
+
+        void setState(State state) {
+            this.state = state;
+        }
+
+        public List<Exception> getExceptions() {
+            return exceptions;
+        }
+
+        void setExceptions(List<Exception> exceptions) {
+            this.exceptions = exceptions;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("{ \"state\": \"").append(state.name()).append("\"");
+            if (exceptions != null && !exceptions.isEmpty()) {
+                sb.append(", \"exceptions\": ");
+                List<String> msgs = new ArrayList<>();
+                exceptions.forEach(e -> msgs.add("\"" + e.getMessage() + 
"\""));
+                sb.append(Arrays.toString(msgs.toArray()));
+            }
+            sb.append(" }");
+            return sb.toString();
+        }
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    private final long timestamp;
+
+    private Status status;
+
+    private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new 
HashMap<>();
+
+    public ResultJobRecord() {
+        this.timestamp = System.nanoTime();
+        this.status = new Status();
+    }
+
+    private void updateState(State newStatus) {
+        // FAILED is a stable status
+        if (status.state != State.FAILED) {
+            status.setState(newStatus);
+        }
+    }
+
+    public void start() {
+        updateState(State.RUNNING);
+    }
+
+    public void success() {
+        updateState(State.SUCCESS);
+    }
+
+    public void fail(ResultSetId rsId, int partition) {
+        getOrCreateDirectoryRecord(rsId, partition).fail();
+    }
+
+    public void fail(List<Exception> exceptions) {
+        updateState(State.FAILED);
+        status.setExceptions(exceptions);
+    }
+
+    @Override
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public Status getStatus() {
+        return status;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ \"status\": ").append(status.toString()).append(", ");
+        sb.append("\"timestamp\": ").append(timestamp).append(", ");
+        sb.append("\"resultsets\": 
").append(Arrays.toString(resultSetMetadataMap.entrySet().toArray())).append(" 
}");
+        return sb.toString();
+    }
+
+    public void setResultSetMetaData(ResultSetId rsId, boolean orderedResult, 
int nPartitions)
+            throws HyracksDataException {
+        ResultSetMetaData rsMd = resultSetMetadataMap.get(rsId);
+        if (rsMd == null) {
+            resultSetMetadataMap.put(rsId, new ResultSetMetaData(nPartitions, 
orderedResult));
+        } else if (rsMd.getOrderedResult() != orderedResult || 
rsMd.getRecords().length != nPartitions) {
+            throw 
HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, 
rsId.toString());
+        }
+        //TODO(tillw) throwing a HyracksDataException here hangs the execution 
tests
+    }
+
+    public ResultSetMetaData getResultSetMetaData(ResultSetId rsId) {
+        return resultSetMetadataMap.get(rsId);
+    }
+
+    public synchronized ResultDirectoryRecord 
getOrCreateDirectoryRecord(ResultSetId rsId, int partition) {
+        ResultDirectoryRecord[] records = 
getResultSetMetaData(rsId).getRecords();
+        if (records[partition] == null) {
+            records[partition] = new ResultDirectoryRecord();
+        }
+        return records[partition];
+    }
+
+    public synchronized ResultDirectoryRecord getDirectoryRecord(ResultSetId 
rsId, int partition)
+            throws HyracksDataException {
+        ResultDirectoryRecord[] records = 
getResultSetMetaData(rsId).getRecords();
+        if (records[partition] == null) {
+            throw HyracksDataException.create(ErrorCode.RESULT_NO_RECORD, 
partition, rsId);
+        }
+        return records[partition];
+    }
+
+    public synchronized void updateState(ResultSetId rsId) {
+        int successCount = 0;
+        ResultDirectoryRecord[] records = 
getResultSetMetaData(rsId).getRecords();
+        for (ResultDirectoryRecord record : records) {
+            if ((record != null) && (record.getStatus() == 
ResultDirectoryRecord.Status.SUCCESS)) {
+                successCount++;
+            }
+        }
+        if (successCount == records.length) {
+            success();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetId.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetId.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetId.java
new file mode 100644
index 0000000..ffd4ced
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetId.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+import java.io.Serializable;
+
+public class ResultSetId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final long id;
+
+    public ResultSetId(long id) {
+        this.id = id;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) id;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof ResultSetId)) {
+            return false;
+        }
+        return ((ResultSetId) o).id == id;
+    }
+
+    @Override
+    public String toString() {
+        return "RSID:" + id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java
new file mode 100644
index 0000000..b7b8f1c
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultSetMetaData.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.result;
+
+import java.util.Arrays;
+
+public class ResultSetMetaData {
+    private final ResultDirectoryRecord[] records;
+    private final boolean ordered;
+
+    ResultSetMetaData(int len, boolean ordered) {
+        this.records = new ResultDirectoryRecord[len];
+        this.ordered = ordered;
+    }
+
+    public boolean getOrderedResult() {
+        return ordered;
+    }
+
+    public ResultDirectoryRecord[] getRecords() {
+        return records;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ordered: ").append(ordered).append(", records: 
").append(Arrays.toString(records));
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
deleted file mode 100644
index 7f549ca..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.client.dataset;
-
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import 
org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
-import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.client.net.ClientNetworkManager;
-import org.apache.hyracks.control.nc.resources.memory.FrameManager;
-
-public class HyracksDataset implements IHyracksDataset {
-    private final IHyracksDatasetDirectoryServiceConnection 
datasetDirectoryServiceConnection;
-
-    private final ClientNetworkManager netManager;
-
-    private final IHyracksCommonContext datasetClientCtx;
-
-    public HyracksDataset(IHyracksClientConnection hcc, int frameSize, int 
nReaders) throws Exception {
-        NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo();
-        datasetDirectoryServiceConnection =
-                new 
HyracksDatasetDirectoryServiceConnection(ddsAddress.getAddress(), 
ddsAddress.getPort());
-
-        netManager = new ClientNetworkManager(nReaders);
-        netManager.start();
-
-        datasetClientCtx = new DatasetClientContext(frameSize);
-    }
-
-    @Override
-    public IHyracksDatasetReader createReader(JobId jobId, ResultSetId 
resultSetId) throws HyracksDataException {
-        IHyracksDatasetReader reader = null;
-        try {
-            reader = new 
HyracksDatasetReader(datasetDirectoryServiceConnection, netManager, 
datasetClientCtx, jobId,
-                    resultSetId);
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-        return reader;
-    }
-
-    static class DatasetClientContext extends FrameManager implements 
IHyracksCommonContext {
-
-        DatasetClientContext(int frameSize) {
-            super(frameSize);
-        }
-
-        @Override
-        public IIOManager getIoManager() {
-            return null;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
deleted file mode 100644
index 63139d9..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.client.dataset;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import 
org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
-import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.hyracks.ipc.api.RPCInterface;
-import org.apache.hyracks.ipc.impl.IPCSystem;
-import 
org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
-
-//TODO(madhusudancs): Should this implementation be moved to 
org.apache.hyracks.client?
-public class HyracksDatasetDirectoryServiceConnection implements 
IHyracksDatasetDirectoryServiceConnection {
-    private final IPCSystem ipc;
-    private final IHyracksDatasetDirectoryServiceInterface ddsi;
-
-    public HyracksDatasetDirectoryServiceConnection(String ddsHost, int 
ddsPort) throws Exception {
-        RPCInterface rpci = new RPCInterface();
-        ipc = new IPCSystem(new InetSocketAddress(0), rpci, new 
JavaSerializationBasedPayloadSerializerDeserializer());
-        ipc.start();
-        IIPCHandle ddsIpchandle = ipc.getReconnectingHandle(new 
InetSocketAddress(ddsHost, ddsPort));
-        this.ddsi = new 
HyracksDatasetDirectoryServiceInterfaceRemoteProxy(ddsIpchandle, rpci);
-    }
-
-    @Override
-    public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws 
Exception {
-        return ddsi.getDatasetResultStatus(jobId, rsId);
-    }
-
-    @Override
-    public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, 
ResultSetId rsId,
-            DatasetDirectoryRecord[] knownRecords) throws Exception {
-        return ddsi.getDatasetResultLocations(jobId, rsId, knownRecords);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
deleted file mode 100644
index 7eeb913..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.client.dataset;
-
-import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
-import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.hyracks.ipc.api.RPCInterface;
-
-//TODO(madhusudancs): Should this implementation be moved to 
org.apache.hyracks.client?
-public class HyracksDatasetDirectoryServiceInterfaceRemoteProxy implements 
IHyracksDatasetDirectoryServiceInterface {
-    private final IIPCHandle ipcHandle;
-
-    private final RPCInterface rpci;
-
-    public HyracksDatasetDirectoryServiceInterfaceRemoteProxy(IIPCHandle 
ipcHandle, RPCInterface rpci) {
-        this.ipcHandle = ipcHandle;
-        this.rpci = rpci;
-    }
-
-    @Override
-    public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws 
Exception {
-        HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf =
-                new 
HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction(jobId, rsId);
-        return (Status) rpci.call(ipcHandle, gdrlf);
-    }
-
-    @Override
-    public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, 
ResultSetId rsId,
-            DatasetDirectoryRecord[] knownRecords) throws Exception {
-        HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction 
gdrlf =
-                new 
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(jobId, rsId, 
knownRecords);
-        return (DatasetDirectoryRecord[]) rpci.call(ipcHandle, gdrlf);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
deleted file mode 100644
index b1566f4..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.client.dataset;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.channels.IInputChannel;
-import org.apache.hyracks.api.channels.IInputChannelMonitor;
-import org.apache.hyracks.api.comm.FrameHelper;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import 
org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
-import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.exceptions.ErrorCode;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.client.net.ClientNetworkManager;
-import org.apache.hyracks.comm.channels.DatasetNetworkInputChannel;
-import org.apache.hyracks.util.annotations.NotThreadSafe;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-@NotThreadSafe
-public class HyracksDatasetReader implements IHyracksDatasetReader {
-
-    private static final Logger LOGGER = LogManager.getLogger();
-    private static final int NUM_READ_BUFFERS = 1;
-    private final IHyracksDatasetDirectoryServiceConnection datasetDirectory;
-    private final ClientNetworkManager netManager;
-    private final IHyracksCommonContext datasetClientCtx;
-    private final JobId jobId;
-    private final ResultSetId resultSetId;
-    private DatasetDirectoryRecord[] knownRecords;
-    private DatasetInputChannelMonitor[] monitors;
-    private DatasetInputChannelMonitor currentRecordMonitor;
-    private DatasetNetworkInputChannel currentRecordChannel;
-    private int currentRecord;
-
-    public HyracksDatasetReader(IHyracksDatasetDirectoryServiceConnection 
datasetDirectory,
-            ClientNetworkManager netManager, IHyracksCommonContext 
datasetClientCtx, JobId jobId,
-            ResultSetId resultSetId) {
-        this.datasetDirectory = datasetDirectory;
-        this.netManager = netManager;
-        this.datasetClientCtx = datasetClientCtx;
-        this.jobId = jobId;
-        this.resultSetId = resultSetId;
-        currentRecord = -1;
-    }
-
-    @Override
-    public Status getResultStatus() {
-        try {
-            return datasetDirectory.getDatasetResultStatus(jobId, resultSetId);
-        } catch (HyracksDataException e) {
-            if (e.getErrorCode() != ErrorCode.NO_RESULT_SET) {
-                LOGGER.log(Level.WARN, "Exception retrieving result set for 
job " + jobId, e);
-            }
-        } catch (Exception e) {
-            LOGGER.log(Level.WARN, "Exception retrieving result set for job " 
+ jobId, e);
-        }
-        return null;
-    }
-
-    @Override
-    public int read(IFrame frame) throws HyracksDataException {
-        frame.reset();
-        int readSize = 0;
-        if (isFirstRead() && !hasNextRecord()) {
-            return readSize;
-        }
-        // read until frame is full or all dataset records have been read
-        while (readSize < frame.getFrameSize()) {
-            if (currentRecordMonitor.hasMoreFrames()) {
-                final ByteBuffer readBuffer = 
currentRecordChannel.getNextBuffer();
-                if (readBuffer == null) {
-                    throw new IllegalStateException("Unexpected empty frame");
-                }
-                currentRecordMonitor.notifyFrameRead();
-                if (readSize == 0) {
-                    final int nBlocks = 
FrameHelper.deserializeNumOfMinFrame(readBuffer);
-                    frame.ensureFrameSize(frame.getMinSize() * nBlocks);
-                    frame.getBuffer().clear();
-                }
-                frame.getBuffer().put(readBuffer);
-                currentRecordChannel.recycleBuffer(readBuffer);
-                readSize = frame.getBuffer().position();
-            } else {
-                currentRecordChannel.close();
-                if (currentRecordMonitor.failed()) {
-                    throw 
HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId);
-                }
-                if (isLastRecord() || !hasNextRecord()) {
-                    break;
-                }
-            }
-        }
-        frame.getBuffer().flip();
-        return readSize;
-    }
-
-    private SocketAddress getSocketAddress(DatasetDirectoryRecord record) 
throws HyracksDataException {
-        try {
-            final NetworkAddress netAddr = record.getNetworkAddress();
-            return new 
InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), 
netAddr.getPort());
-        } catch (UnknownHostException e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    private DatasetInputChannelMonitor getMonitor(int partition) {
-        if (knownRecords == null || knownRecords[partition] == null) {
-            throw new IllegalStateException("Accessing monitors before 
obtaining the corresponding addresses");
-        }
-        if (monitors == null) {
-            monitors = new DatasetInputChannelMonitor[knownRecords.length];
-        }
-        if (monitors[partition] == null) {
-            monitors[partition] = new DatasetInputChannelMonitor();
-        }
-        return monitors[partition];
-    }
-
-    private boolean hasNextRecord() throws HyracksDataException {
-        currentRecord++;
-        DatasetDirectoryRecord record = getRecord(currentRecord);
-        // skip empty records
-        while (record.isEmpty() && ++currentRecord < knownRecords.length) {
-            record = getRecord(currentRecord);
-        }
-        if (currentRecord == knownRecords.length) {
-            // exhausted all known records
-            return false;
-        }
-        requestRecordData(record);
-        return true;
-    }
-
-    private DatasetDirectoryRecord getRecord(int recordNum) throws 
HyracksDataException {
-        try {
-            while (knownRecords == null || knownRecords[recordNum] == null) {
-                knownRecords = 
datasetDirectory.getDatasetResultLocations(jobId, resultSetId, knownRecords);
-            }
-            return knownRecords[recordNum];
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    private void requestRecordData(DatasetDirectoryRecord record) throws 
HyracksDataException {
-        currentRecordChannel = new DatasetNetworkInputChannel(netManager, 
getSocketAddress(record), jobId, resultSetId,
-                currentRecord, NUM_READ_BUFFERS);
-        currentRecordMonitor = getMonitor(currentRecord);
-        currentRecordChannel.registerMonitor(currentRecordMonitor);
-        currentRecordChannel.open(datasetClientCtx);
-    }
-
-    private boolean isFirstRead() {
-        return currentRecord == -1;
-    }
-
-    private boolean isLastRecord() {
-        return knownRecords != null && currentRecord == knownRecords.length - 
1;
-    }
-
-    private static class DatasetInputChannelMonitor implements 
IInputChannelMonitor {
-
-        private int availableFrames;
-        private boolean eos;
-        private boolean failed;
-
-        DatasetInputChannelMonitor() {
-            eos = false;
-            failed = false;
-        }
-
-        @Override
-        public synchronized void notifyFailure(IInputChannel channel, int 
errorCode) {
-            failed = true;
-            notifyAll();
-        }
-
-        @Override
-        public synchronized void notifyDataAvailability(IInputChannel channel, 
int nFrames) {
-            availableFrames += nFrames;
-            notifyAll();
-        }
-
-        @Override
-        public synchronized void notifyEndOfStream(IInputChannel channel) {
-            eos = true;
-            notifyAll();
-        }
-
-        synchronized boolean failed() {
-            return failed;
-        }
-
-        synchronized void notifyFrameRead() {
-            availableFrames--;
-            notifyAll();
-        }
-
-        synchronized boolean hasMoreFrames() throws HyracksDataException {
-            while (!failed && !eos && availableFrames == 0) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    throw HyracksDataException.create(e);
-                }
-            }
-            return !failed && !isFullyConsumed();
-        }
-
-        private synchronized boolean isFullyConsumed() {
-            return availableFrames == 0 && eos;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
new file mode 100644
index 0000000..6f8c4d0
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.client.result;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
+import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.ResultDirectoryRecord;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.RPCInterface;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+import 
org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+
+//TODO(madhusudancs): Should this implementation be moved to 
org.apache.hyracks.client?
+public class ResultDirectory implements IResultDirectory {
+    private final IPCSystem ipc;
+    private final IResultDirectory remoteResultDirectory;
+
+    public ResultDirectory(String resultHost, int resultPort) throws Exception 
{
+        RPCInterface rpci = new RPCInterface();
+        ipc = new IPCSystem(new InetSocketAddress(0), rpci, new 
JavaSerializationBasedPayloadSerializerDeserializer());
+        ipc.start();
+        IIPCHandle ddsIpchandle = ipc.getReconnectingHandle(new 
InetSocketAddress(resultHost, resultPort));
+        this.remoteResultDirectory = new 
ResultDirectoryRemoteProxy(ddsIpchandle, rpci);
+    }
+
+    @Override
+    public Status getResultStatus(JobId jobId, ResultSetId rsId) throws 
Exception {
+        return remoteResultDirectory.getResultStatus(jobId, rsId);
+    }
+
+    @Override
+    public ResultDirectoryRecord[] getResultLocations(JobId jobId, ResultSetId 
rsId,
+            ResultDirectoryRecord[] knownRecords) throws Exception {
+        return remoteResultDirectory.getResultLocations(jobId, rsId, 
knownRecords);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
new file mode 100644
index 0000000..77c6e4b
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.client.result;
+
+import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
+import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.ResultDirectoryRecord;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.RPCInterface;
+
+//TODO(madhusudancs): Should this implementation be moved to 
org.apache.hyracks.client?
+public class ResultDirectoryRemoteProxy implements IResultDirectory {
+    private final IIPCHandle ipcHandle;
+
+    private final RPCInterface rpci;
+
+    public ResultDirectoryRemoteProxy(IIPCHandle ipcHandle, RPCInterface rpci) 
{
+        this.ipcHandle = ipcHandle;
+        this.rpci = rpci;
+    }
+
+    @Override
+    public Status getResultStatus(JobId jobId, ResultSetId rsId) throws 
Exception {
+        HyracksClientInterfaceFunctions.GetResultStatusFunction gdrlf =
+                new 
HyracksClientInterfaceFunctions.GetResultStatusFunction(jobId, rsId);
+        return (Status) rpci.call(ipcHandle, gdrlf);
+    }
+
+    @Override
+    public ResultDirectoryRecord[] getResultLocations(JobId jobId, ResultSetId 
rsId,
+            ResultDirectoryRecord[] knownRecords) throws Exception {
+        HyracksClientInterfaceFunctions.GetResultLocationsFunction gdrlf =
+                new 
HyracksClientInterfaceFunctions.GetResultLocationsFunction(jobId, rsId, 
knownRecords);
+        return (ResultDirectoryRecord[]) rpci.call(ipcHandle, gdrlf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
new file mode 100644
index 0000000..ef93cce
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.client.result;
+
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.IResultSetReader;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.client.net.ClientNetworkManager;
+import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+
+public class ResultSet implements IResultSet {
+    private final IResultDirectory resultDirectory;
+
+    private final ClientNetworkManager netManager;
+
+    private final IHyracksCommonContext resultClientCtx;
+
+    public ResultSet(IHyracksClientConnection hcc, int frameSize, int 
nReaders) throws Exception {
+        NetworkAddress ddsAddress = hcc.getResultDirectoryAddress();
+        resultDirectory = new ResultDirectory(ddsAddress.getAddress(), 
ddsAddress.getPort());
+
+        netManager = new ClientNetworkManager(nReaders);
+        netManager.start();
+
+        resultClientCtx = new ResultClientContext(frameSize);
+    }
+
+    @Override
+    public IResultSetReader createReader(JobId jobId, ResultSetId resultSetId) 
throws HyracksDataException {
+        IResultSetReader reader = null;
+        try {
+            reader = new ResultSetReader(resultDirectory, netManager, 
resultClientCtx, jobId, resultSetId);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+        return reader;
+    }
+
+    static class ResultClientContext extends FrameManager implements 
IHyracksCommonContext {
+
+        ResultClientContext(int frameSize) {
+            super(frameSize);
+        }
+
+        @Override
+        public IIOManager getIoManager() {
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
new file mode 100644
index 0000000..092d860
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.client.result;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.channels.IInputChannel;
+import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
+import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.IResultSetReader;
+import org.apache.hyracks.api.result.ResultDirectoryRecord;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.client.net.ClientNetworkManager;
+import org.apache.hyracks.comm.channels.ResultNetworkInputChannel;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@NotThreadSafe
+public class ResultSetReader implements IResultSetReader {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final int NUM_READ_BUFFERS = 1;
+    private final IResultDirectory resultDirectory;
+    private final ClientNetworkManager netManager;
+    private final IHyracksCommonContext resultClientCtx;
+    private final JobId jobId;
+    private final ResultSetId resultSetId;
+    private ResultDirectoryRecord[] knownRecords;
+    private ResultInputChannelMonitor[] monitors;
+    private ResultInputChannelMonitor currentRecordMonitor;
+    private ResultNetworkInputChannel currentRecordChannel;
+    private int currentRecord;
+
+    public ResultSetReader(IResultDirectory resultDirectory, 
ClientNetworkManager netManager,
+            IHyracksCommonContext resultClientCtx, JobId jobId, ResultSetId 
resultSetId) {
+        this.resultDirectory = resultDirectory;
+        this.netManager = netManager;
+        this.resultClientCtx = resultClientCtx;
+        this.jobId = jobId;
+        this.resultSetId = resultSetId;
+        currentRecord = -1;
+    }
+
+    @Override
+    public Status getResultStatus() {
+        try {
+            return resultDirectory.getResultStatus(jobId, resultSetId);
+        } catch (HyracksDataException e) {
+            if (e.getErrorCode() != ErrorCode.NO_RESULT_SET) {
+                LOGGER.log(Level.WARN, "Exception retrieving result set for 
job " + jobId, e);
+            }
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "Exception retrieving result set for job " 
+ jobId, e);
+        }
+        return null;
+    }
+
+    @Override
+    public int read(IFrame frame) throws HyracksDataException {
+        frame.reset();
+        int readSize = 0;
+        if (isFirstRead() && !hasNextRecord()) {
+            return readSize;
+        }
+        // read until frame is full or all result records have been read
+        while (readSize < frame.getFrameSize()) {
+            if (currentRecordMonitor.hasMoreFrames()) {
+                final ByteBuffer readBuffer = 
currentRecordChannel.getNextBuffer();
+                if (readBuffer == null) {
+                    throw new IllegalStateException("Unexpected empty frame");
+                }
+                currentRecordMonitor.notifyFrameRead();
+                if (readSize == 0) {
+                    final int nBlocks = 
FrameHelper.deserializeNumOfMinFrame(readBuffer);
+                    frame.ensureFrameSize(frame.getMinSize() * nBlocks);
+                    frame.getBuffer().clear();
+                }
+                frame.getBuffer().put(readBuffer);
+                currentRecordChannel.recycleBuffer(readBuffer);
+                readSize = frame.getBuffer().position();
+            } else {
+                currentRecordChannel.close();
+                if (currentRecordMonitor.failed()) {
+                    throw 
HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId);
+                }
+                if (isLastRecord() || !hasNextRecord()) {
+                    break;
+                }
+            }
+        }
+        frame.getBuffer().flip();
+        return readSize;
+    }
+
+    private SocketAddress getSocketAddress(ResultDirectoryRecord record) 
throws HyracksDataException {
+        try {
+            final NetworkAddress netAddr = record.getNetworkAddress();
+            return new 
InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), 
netAddr.getPort());
+        } catch (UnknownHostException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private ResultInputChannelMonitor getMonitor(int partition) {
+        if (knownRecords == null || knownRecords[partition] == null) {
+            throw new IllegalStateException("Accessing monitors before 
obtaining the corresponding addresses");
+        }
+        if (monitors == null) {
+            monitors = new ResultInputChannelMonitor[knownRecords.length];
+        }
+        if (monitors[partition] == null) {
+            monitors[partition] = new ResultInputChannelMonitor();
+        }
+        return monitors[partition];
+    }
+
+    private boolean hasNextRecord() throws HyracksDataException {
+        currentRecord++;
+        ResultDirectoryRecord record = getRecord(currentRecord);
+        // skip empty records
+        while (record.isEmpty() && ++currentRecord < knownRecords.length) {
+            record = getRecord(currentRecord);
+        }
+        if (currentRecord == knownRecords.length) {
+            // exhausted all known records
+            return false;
+        }
+        requestRecordData(record);
+        return true;
+    }
+
+    private ResultDirectoryRecord getRecord(int recordNum) throws 
HyracksDataException {
+        try {
+            while (knownRecords == null || knownRecords[recordNum] == null) {
+                knownRecords = resultDirectory.getResultLocations(jobId, 
resultSetId, knownRecords);
+            }
+            return knownRecords[recordNum];
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void requestRecordData(ResultDirectoryRecord record) throws 
HyracksDataException {
+        currentRecordChannel = new ResultNetworkInputChannel(netManager, 
getSocketAddress(record), jobId, resultSetId,
+                currentRecord, NUM_READ_BUFFERS);
+        currentRecordMonitor = getMonitor(currentRecord);
+        currentRecordChannel.registerMonitor(currentRecordMonitor);
+        currentRecordChannel.open(resultClientCtx);
+    }
+
+    private boolean isFirstRead() {
+        return currentRecord == -1;
+    }
+
+    private boolean isLastRecord() {
+        return knownRecords != null && currentRecord == knownRecords.length - 
1;
+    }
+
+    private static class ResultInputChannelMonitor implements 
IInputChannelMonitor {
+
+        private int availableFrames;
+        private boolean eos;
+        private boolean failed;
+
+        ResultInputChannelMonitor() {
+            eos = false;
+            failed = false;
+        }
+
+        @Override
+        public synchronized void notifyFailure(IInputChannel channel, int 
errorCode) {
+            failed = true;
+            notifyAll();
+        }
+
+        @Override
+        public synchronized void notifyDataAvailability(IInputChannel channel, 
int nFrames) {
+            availableFrames += nFrames;
+            notifyAll();
+        }
+
+        @Override
+        public synchronized void notifyEndOfStream(IInputChannel channel) {
+            eos = true;
+            notifyAll();
+        }
+
+        synchronized boolean failed() {
+            return failed;
+        }
+
+        synchronized void notifyFrameRead() {
+            availableFrames--;
+            notifyAll();
+        }
+
+        synchronized boolean hasMoreFrames() throws HyracksDataException {
+            while (!failed && !eos && availableFrames == 0) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(e);
+                }
+            }
+            return !failed && !isFullyConsumed();
+        }
+
+        private synchronized boolean isFullyConsumed() {
+            return availableFrames == 0 && eos;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
deleted file mode 100644
index 0f96a6e..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.comm.channels;
-
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-import org.apache.hyracks.api.channels.IInputChannel;
-import org.apache.hyracks.api.channels.IInputChannelMonitor;
-import org.apache.hyracks.api.comm.IBufferAcceptor;
-import org.apache.hyracks.api.comm.IChannelControlBlock;
-import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class DatasetNetworkInputChannel implements IInputChannel {
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    static final int INITIAL_MESSAGE_SIZE = 20;
-
-    private final IChannelConnectionFactory netManager;
-
-    private final SocketAddress remoteAddress;
-
-    private final JobId jobId;
-
-    private final ResultSetId resultSetId;
-
-    private final int partition;
-
-    private final Queue<ByteBuffer> fullQueue;
-
-    private final int nBuffers;
-
-    private IChannelControlBlock ccb;
-
-    private IInputChannelMonitor monitor;
-
-    private Object attachment;
-
-    public DatasetNetworkInputChannel(IChannelConnectionFactory netManager, 
SocketAddress remoteAddress, JobId jobId,
-            ResultSetId resultSetId, int partition, int nBuffers) {
-        this.netManager = netManager;
-        this.remoteAddress = remoteAddress;
-        this.jobId = jobId;
-        this.resultSetId = resultSetId;
-        this.partition = partition;
-        fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
-        this.nBuffers = nBuffers;
-    }
-
-    @Override
-    public void registerMonitor(IInputChannelMonitor monitor) {
-        this.monitor = monitor;
-    }
-
-    @Override
-    public void setAttachment(Object attachment) {
-        this.attachment = attachment;
-    }
-
-    @Override
-    public Object getAttachment() {
-        return attachment;
-    }
-
-    @Override
-    public synchronized ByteBuffer getNextBuffer() {
-        return fullQueue.poll();
-    }
-
-    @Override
-    public void recycleBuffer(ByteBuffer buffer) {
-        buffer.clear();
-        ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
-    }
-
-    @Override
-    public void open(IHyracksCommonContext ctx) throws HyracksDataException {
-        try {
-            ccb = netManager.connect(remoteAddress);
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-        ccb.getReadInterface().setFullBufferAcceptor(new 
ReadFullBufferAcceptor());
-        ccb.getWriteInterface().setEmptyBufferAcceptor(new 
WriteEmptyBufferAcceptor());
-        for (int i = 0; i < nBuffers; ++i) {
-            
ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
-        }
-        ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
-        writeBuffer.putLong(jobId.getId());
-        writeBuffer.putLong(resultSetId.getId());
-        writeBuffer.putInt(partition);
-        writeBuffer.flip();
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Sending partition request for JobId: " + jobId + " 
partition: " + partition + " on channel: "
-                    + ccb);
-        }
-        ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
-        ccb.getWriteInterface().getFullBufferAcceptor().close();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-
-    }
-
-    private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
-        @Override
-        public void accept(ByteBuffer buffer) {
-            fullQueue.add(buffer);
-            monitor.notifyDataAvailability(DatasetNetworkInputChannel.this, 1);
-        }
-
-        @Override
-        public void close() {
-            monitor.notifyEndOfStream(DatasetNetworkInputChannel.this);
-        }
-
-        @Override
-        public void error(int ecode) {
-            monitor.notifyFailure(DatasetNetworkInputChannel.this, ecode);
-        }
-    }
-
-    private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
-        @Override
-        public void accept(ByteBuffer buffer) {
-            // do nothing
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
new file mode 100644
index 0000000..37540cd
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.comm.channels;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import org.apache.hyracks.api.channels.IInputChannel;
+import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ResultNetworkInputChannel implements IInputChannel {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    static final int INITIAL_MESSAGE_SIZE = 20;
+
+    private final IChannelConnectionFactory netManager;
+
+    private final SocketAddress remoteAddress;
+
+    private final JobId jobId;
+
+    private final ResultSetId resultSetId;
+
+    private final int partition;
+
+    private final Queue<ByteBuffer> fullQueue;
+
+    private final int nBuffers;
+
+    private IChannelControlBlock ccb;
+
+    private IInputChannelMonitor monitor;
+
+    private Object attachment;
+
+    public ResultNetworkInputChannel(IChannelConnectionFactory netManager, 
SocketAddress remoteAddress, JobId jobId,
+            ResultSetId resultSetId, int partition, int nBuffers) {
+        this.netManager = netManager;
+        this.remoteAddress = remoteAddress;
+        this.jobId = jobId;
+        this.resultSetId = resultSetId;
+        this.partition = partition;
+        fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+        this.nBuffers = nBuffers;
+    }
+
+    @Override
+    public void registerMonitor(IInputChannelMonitor monitor) {
+        this.monitor = monitor;
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    @Override
+    public synchronized ByteBuffer getNextBuffer() {
+        return fullQueue.poll();
+    }
+
+    @Override
+    public void recycleBuffer(ByteBuffer buffer) {
+        buffer.clear();
+        ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
+    }
+
+    @Override
+    public void open(IHyracksCommonContext ctx) throws HyracksDataException {
+        try {
+            ccb = netManager.connect(remoteAddress);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+        ccb.getReadInterface().setFullBufferAcceptor(new 
ReadFullBufferAcceptor());
+        ccb.getWriteInterface().setEmptyBufferAcceptor(new 
WriteEmptyBufferAcceptor());
+        for (int i = 0; i < nBuffers; ++i) {
+            
ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
+        }
+        ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
+        writeBuffer.putLong(jobId.getId());
+        writeBuffer.putLong(resultSetId.getId());
+        writeBuffer.putInt(partition);
+        writeBuffer.flip();
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Sending partition request for JobId: " + jobId + " 
partition: " + partition + " on channel: "
+                    + ccb);
+        }
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+
+    }
+
+    private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            fullQueue.add(buffer);
+            monitor.notifyDataAvailability(ResultNetworkInputChannel.this, 1);
+        }
+
+        @Override
+        public void close() {
+            monitor.notifyEndOfStream(ResultNetworkInputChannel.this);
+        }
+
+        @Override
+        public void error(int ecode) {
+            monitor.notifyFailure(ResultNetworkInputChannel.this, ecode);
+        }
+    }
+
+    private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            // do nothing
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d2f00d8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index a669402..2edbab8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -20,21 +20,21 @@ package org.apache.hyracks.control.cc;
 
 import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
 import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.DeployedJobSpecIdFactory;
 import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
 import org.apache.hyracks.control.cc.work.CancelJobWork;
 import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
 import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
 import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
 import org.apache.hyracks.control.cc.work.DeployJobSpecWork;
-import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
 import org.apache.hyracks.control.cc.work.GetJobInfoWork;
 import org.apache.hyracks.control.cc.work.GetJobStatusWork;
 import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
 import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
+import org.apache.hyracks.control.cc.work.GetResultDirectoryAddressWork;
 import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
 import org.apache.hyracks.control.cc.work.GetResultStatusWork;
 import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
@@ -121,19 +121,19 @@ class ClientInterfaceIPCI implements IIPCI {
                 ccs.getWorkQueue().schedule(new JobStartWork(ccs, 
sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
                         jobIdFactory, sjf.getJobParameters(), new 
IPCResponder<>(handle, mid), id));
                 break;
-            case GET_DATASET_DIRECTORY_SERIVICE_INFO:
+            case GET_RESULT_DIRECTORY_ADDRESS:
                 ccs.getWorkQueue().schedule(
-                        new GetDatasetDirectoryServiceInfoWork(ccs, new 
IPCResponder<NetworkAddress>(handle, mid)));
+                        new GetResultDirectoryAddressWork(ccs, new 
IPCResponder<NetworkAddress>(handle, mid)));
                 break;
-            case GET_DATASET_RESULT_STATUS:
-                HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction 
gdrsf =
-                        
(HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
+            case GET_RESULT_STATUS:
+                HyracksClientInterfaceFunctions.GetResultStatusFunction gdrsf =
+                        
(HyracksClientInterfaceFunctions.GetResultStatusFunction) fn;
                 ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs, 
gdrsf.getJobId(), gdrsf.getResultSetId(),
                         new IPCResponder<Status>(handle, mid)));
                 break;
-            case GET_DATASET_RESULT_LOCATIONS:
-                
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
-                        
(HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+            case GET_RESULT_LOCATIONS:
+                HyracksClientInterfaceFunctions.GetResultLocationsFunction 
gdrlf =
+                        
(HyracksClientInterfaceFunctions.GetResultLocationsFunction) fn;
                 ccs.getWorkQueue().schedule(new 
GetResultPartitionLocationsWork(ccs, gdrlf.getJobId(),
                         gdrlf.getResultSetId(), gdrlf.getKnownRecords(), new 
IPCResponder<>(handle, mid)));
                 break;

Reply via email to