This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 051e4fd25c373e51f38a5a10a44ed7e6c303ce54
Author: Ian Maxon <[email protected]>
AuthorDate: Thu Sep 4 10:59:26 2025 -0700

    [ASTERIXDB-3644] Callback for result consumption
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    Add a callback for when a result partition is consumed.
    Then, when all results are consumed, remove the job
    from the result directory.
    
    Ext-ref: MB-68455
    
    Change-Id: I768eab7bd3ed1dbfda1bd3449264c92952b4cb53
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20292
    Reviewed-by: Peeyush Gupta <[email protected]>
    Tested-by: Ian Maxon <[email protected]>
    Integration-Tests: Ian Maxon <[email protected]>
---
 .../api/http/server/QueryResultApiServlet.java     | 12 ++++-
 .../async-deferred/AsyncDeferredQueries.xml        |  2 +-
 .../hyracks/api/result/IResultDirectory.java       |  2 +
 .../api/result/IResultPartitionManager.java        |  4 +-
 .../hyracks/api/result/ResultDirectoryRecord.java  |  4 ++
 .../apache/hyracks/api/result/ResultJobRecord.java | 23 +++++++--
 .../hyracks/client/result/ResultDirectory.java     |  5 ++
 .../client/result/ResultDirectoryRemoteProxy.java  |  5 ++
 .../hyracks/control/cc/ClusterControllerIPCI.java  |  7 +++
 .../control/cc/result/IResultDirectoryService.java |  2 +
 .../control/cc/result/ResultDirectoryService.java  | 12 +++++
 .../cc/work/ReportResultPartitionConsumedWork.java | 57 ++++++++++++++++++++++
 .../control/common/base/IClusterController.java    |  2 +
 .../hyracks/control/common/ipc/CCNCFunctions.java  | 34 +++++++++++++
 .../common/ipc/ClusterControllerRemoteProxy.java   |  7 +++
 .../control/nc/result/ResultPartitionManager.java  | 15 +++++-
 .../control/nc/result/ResultPartitionReader.java   |  4 +-
 17 files changed, 188 insertions(+), 9 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 3b0e031690..09ea35eae4 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -65,9 +65,10 @@ public class QueryResultApiServlet extends 
AbstractQueryApiServlet {
         }
         IResultSet resultSet = getResultSet();
         ResultReader resultReader = new ResultReader(resultSet, 
handle.getJobId(), handle.getResultSetId());
+        boolean printStarted = false;
+        ResponsePrinter printer = null;
         try {
             ResultJobRecord.Status status = resultReader.getStatus();
-
             final HttpResponseStatus httpStatus;
             if (status == null) {
                 httpStatus = HttpResponseStatus.NOT_FOUND;
@@ -79,6 +80,7 @@ public class QueryResultApiServlet extends 
AbstractQueryApiServlet {
                     case RUNNING:
                     case IDLE:
                     case FAILED:
+                    case REMOVED:
                         httpStatus = HttpResponseStatus.NOT_FOUND;
                         break;
                     default:
@@ -92,12 +94,13 @@ public class QueryResultApiServlet extends 
AbstractQueryApiServlet {
             }
             ResultMetadata metadata = (ResultMetadata) 
resultReader.getMetadata();
             SessionOutput sessionOutput = initResponse(request, response, 
metadata.getFormat());
-            ResponsePrinter printer = new ResponsePrinter(sessionOutput);
+            printer = new ResponsePrinter(sessionOutput);
             if (metadata.getFormat() == SessionConfig.OutputFormat.CLEAN_JSON
                     || metadata.getFormat() == 
SessionConfig.OutputFormat.LOSSLESS_JSON
                     || metadata.getFormat() == 
SessionConfig.OutputFormat.LOSSLESS_ADM_JSON) {
                 final Stats stats = new Stats();
                 printer.begin();
+                printStarted = true;
                 printer.addResultPrinter(new ResultsPrinter(appCtx, 
resultReader, null, stats, sessionOutput));
                 printer.printResults();
                 ResponseMetrics metrics = ResponseMetrics.of(System.nanoTime() 
- elapsedStart,
@@ -112,6 +115,7 @@ public class QueryResultApiServlet extends 
AbstractQueryApiServlet {
                 }
                 printer.printFooters();
                 printer.end();
+                printStarted = false;
             } else {
                 ResultUtil.printResults(appCtx, resultReader, sessionOutput, 
new Stats(), null);
             }
@@ -127,6 +131,10 @@ public class QueryResultApiServlet extends 
AbstractQueryApiServlet {
         } catch (Exception e) {
             response.setStatus(HttpResponseStatus.BAD_REQUEST);
             LOGGER.log(Level.WARN, "Error retrieving result for \"" + 
strHandle + "\"", e);
+        } finally {
+            if (printStarted && printer != null) {
+                printer.end();
+            }
         }
         if (response.writer().checkError()) {
             LOGGER.warn("Error flushing output writer for \"" + strHandle + 
"\"");
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
index 9a5e17eaf5..a6aef7cb1c 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
@@ -59,7 +59,7 @@
         <compilation-unit name="async-exhausted-result">
             <output-dir 
compare="Clean-JSON">async-exhausted-result</output-dir>
             <parameter name="profile" value="timings" type="string"/>
-            <expected-error>Premature end of chunk</expected-error> 
<!--TODO:REVISIT -->
+            <expected-error>HTTP/1.1 404 Not Found</expected-error>
             <source-location>false</source-location>
         </compilation-unit>
     </test-case>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
index 47984946e1..184cb7c4e2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
@@ -57,4 +57,6 @@ public interface IResultDirectory {
      * @throws Exception
      */
     IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws 
Exception;
+
+    void releaseResult(JobId jobId, ResultSetId rsId);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
index 4909903b58..f107fb802f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
@@ -32,10 +32,12 @@ public interface IResultPartitionManager extends 
IResultManager {
 
     void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, 
int partition) throws HyracksException;
 
+    void reportPartitionConsumed(JobId jobId, ResultSetId rsId, int partition) 
throws HyracksException;
+
     void initializeResultPartitionReader(JobId jobId, ResultSetId resultSetId, 
int partition, IFrameWriter noc)
             throws HyracksException;
 
-    void removePartition(JobId jobId, ResultSetId resultSetId, int partition);
+    void removePartition(JobId jobId, ResultSetId resultSetId, int partition) 
throws HyracksException;
 
     void abortReader(JobId jobId);
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
index 71792bef69..a442c11c85 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
@@ -78,6 +78,10 @@ public class ResultDirectoryRecord implements Serializable {
         updateStatus(Status.SUCCESS);
     }
 
+    public boolean ready() {
+        return status == Status.SUCCESS;
+    }
+
     public void fail() {
         status = Status.FAILED;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index 02762ee8dc..44de45f771 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -34,6 +34,7 @@ public class ResultJobRecord implements IResultStateRecord {
         IDLE,
         RUNNING,
         SUCCESS,
+        REMOVED,
         FAILED
     }
 
@@ -114,6 +115,14 @@ public class ResultJobRecord implements IResultStateRecord 
{
         updateState(State.SUCCESS);
     }
 
+    public void consume() {
+        updateState(State.REMOVED);
+    }
+
+    public boolean consumed() {
+        return status.getState() == State.REMOVED;
+    }
+
     public void fail(List<Exception> exceptions) {
         updateState(State.FAILED);
         status.setExceptions(exceptions);
@@ -166,13 +175,21 @@ public class ResultJobRecord implements 
IResultStateRecord {
 
     public synchronized void updateState() {
         int successCount = 0;
+        int consumedCount = 0;
         ResultDirectoryRecord[] records = resultSetMetaData.getRecords();
         for (ResultDirectoryRecord record : records) {
-            if ((record != null) && (record.getStatus() == 
ResultDirectoryRecord.Status.SUCCESS)) {
-                successCount++;
+            if (record != null) {
+                if (record.ready()) {
+                    successCount++;
+                }
+                if (record.hasReachedReadEOS()) {
+                    consumedCount++;
+                }
             }
         }
-        if (successCount == records.length) {
+        if (consumedCount == records.length) {
+            consume();
+        } else if (successCount == records.length) {
             success();
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
index 5dd5fff3be..1344473ed5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
@@ -66,6 +66,11 @@ public class ResultDirectory implements IResultDirectory, 
Closeable {
         return remoteResultDirectory.getResultMetadata(jobId, rsId);
     }
 
+    @Override
+    public void releaseResult(JobId jobId, ResultSetId rsId) {
+
+    }
+
     @Override
     public void close() throws IOException {
         ipc.stop();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
index ed628f7ca5..f5c4a3a643 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
@@ -60,4 +60,9 @@ public class ResultDirectoryRemoteProxy implements 
IResultDirectory {
                 new 
HyracksClientInterfaceFunctions.GetResultMetadataFunction(jobId, rsId);
         return (IResultMetadata) rpci.call(ipcHandle, grmf);
     }
+
+    @Override
+    public void releaseResult(JobId jobId, ResultSetId rsId) {
+
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index d350f615d0..6d32621a4a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -36,6 +36,7 @@ import 
org.apache.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
 import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
 import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
 import org.apache.hyracks.control.cc.work.ReportProfilesWork;
+import org.apache.hyracks.control.cc.work.ReportResultPartitionConsumedWork;
 import 
org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
 import org.apache.hyracks.control.cc.work.TaskCompleteWork;
 import org.apache.hyracks.control.cc.work.TaskFailureWork;
@@ -127,6 +128,12 @@ class ClusterControllerIPCI implements IIPCI {
                 ccs.getWorkQueue().schedule(new 
ReportResultPartitionWriteCompletionWork(ccs, rrpwc.getJobId(),
                         rrpwc.getResultSetId(), rrpwc.getPartition()));
                 break;
+            case REPORT_RESULT_PARTITION_CONSUMED:
+                CCNCFunctions.ReportResultPartitionConsumedFunction rrpc =
+                        (CCNCFunctions.ReportResultPartitionConsumedFunction) 
fn;
+                ccs.getWorkQueue().schedule(new 
ReportResultPartitionConsumedWork(ccs, rrpc.getJobId(),
+                        rrpc.getResultSetId(), rrpc.getPartition()));
+                break;
             case SEND_APPLICATION_MESSAGE:
                 CCNCFunctions.SendApplicationMessageFunction rsf = 
(CCNCFunctions.SendApplicationMessageFunction) fn;
                 ApplicationMessageWork work =
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
index b1ecb45231..a986f331ee 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
@@ -43,6 +43,8 @@ public interface IResultDirectoryService extends 
IJobLifecycleListener, IResultM
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId 
rsId, int partition)
             throws HyracksDataException;
 
+    public void reportResultPartitionConsumed(JobId jobId, ResultSetId rsId, 
int partition) throws HyracksDataException;
+
     public void reportJobFailure(JobId jobId, List<Exception> exceptions);
 
     public Status getResultStatus(JobId jobId, ResultSetId rsId) throws 
HyracksDataException;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index b6274d92b3..139775c353 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -154,6 +154,18 @@ public class ResultDirectoryService extends 
AbstractResultManager implements IRe
         notifyAll();
     }
 
+    @Override
+    public synchronized void reportResultPartitionConsumed(JobId jobId, 
ResultSetId rsId, int partition)
+            throws HyracksDataException {
+        ResultJobRecord djr = getNonNullResultJobRecord(jobId);
+        djr.getDirectoryRecord(partition).readEOS();
+        djr.updateState();
+        if (djr.consumed()) {
+            sweep(jobId);
+        }
+        notifyAll();
+    }
+
     @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> 
exceptions) {
         ResultJobRecord rjr = getResultJobRecord(jobId);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionConsumedWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionConsumedWork.java
new file mode 100644
index 0000000000..5e618517bb
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionConsumedWork.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.work.AbstractWork;
+
+public class ReportResultPartitionConsumedWork extends AbstractWork {
+    private final ClusterControllerService ccs;
+
+    private final JobId jobId;
+
+    private final ResultSetId rsId;
+
+    private final int partition;
+
+    public ReportResultPartitionConsumedWork(ClusterControllerService ccs, 
JobId jobId, ResultSetId rsId,
+            int partition) {
+        this.ccs = ccs;
+        this.jobId = jobId;
+        this.rsId = rsId;
+        this.partition = partition;
+    }
+
+    @Override
+    public void run() {
+        try {
+            
ccs.getResultDirectoryService().reportResultPartitionConsumed(jobId, rsId, 
partition);
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " 
Partition@" + partition;
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 1c911835d6..d706c58ef8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -73,6 +73,8 @@ public interface IClusterController {
 
     void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, 
int partition) throws Exception;
 
+    void reportResultPartitionConsumed(JobId jobId, ResultSetId rsId, int 
partition) throws Exception;
+
     void getNodeControllerInfos() throws Exception;
 
     void notifyThreadDump(String nodeId, String requestId, String 
threadDumpJSON) throws Exception;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 1939f9e27d..ace60955d4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -93,6 +93,7 @@ public class CCNCFunctions {
         REGISTER_PARTITION_REQUEST,
         REGISTER_RESULT_PARTITION_LOCATION,
         REPORT_RESULT_PARTITION_WRITE_COMPLETION,
+        REPORT_RESULT_PARTITION_CONSUMED,
 
         NODE_REGISTRATION_RESULT,
         START_TASKS,
@@ -686,6 +687,39 @@ public class CCNCFunctions {
         }
     }
 
+    public static class ReportResultPartitionConsumedFunction extends Function 
{
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        private final ResultSetId rsId;
+
+        private final int partition;
+
+        public ReportResultPartitionConsumedFunction(JobId jobId, ResultSetId 
rsId, int partition) {
+            this.jobId = jobId;
+            this.rsId = rsId;
+            this.partition = partition;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REPORT_RESULT_PARTITION_CONSUMED;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public ResultSetId getResultSetId() {
+            return rsId;
+        }
+
+        public int getPartition() {
+            return partition;
+        }
+    }
+
     public static class NodeRegistrationResult extends Function {
         private static final long serialVersionUID = 1L;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 09dc04d59e..5313dad496 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -44,6 +44,7 @@ import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterPartitionRequ
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterResultPartitionLocationFunction;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportDeployedJobSpecFailureFunction;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportProfileFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportResultPartitionConsumedFunction;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportResultPartitionWriteCompletionFunction;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.SendApplicationMessageFunction;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
@@ -154,6 +155,12 @@ public class ClusterControllerRemoteProxy implements 
IClusterController {
         ipcHandle.send(-1, fn, null);
     }
 
+    @Override
+    public void reportResultPartitionConsumed(JobId jobId, ResultSetId rsId, 
int partition) throws Exception {
+        ReportResultPartitionConsumedFunction fn = new 
ReportResultPartitionConsumedFunction(jobId, rsId, partition);
+        ipcHandle.send(-1, fn, null);
+    }
+
     @Override
     public void notifyDeployedJobSpecFailure(DeployedJobSpecId 
deployedJobSpecId, String nodeId) throws Exception {
         ReportDeployedJobSpecFailureFunction fn = new 
ReportDeployedJobSpecFailureFunction(deployedJobSpecId, nodeId);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
index 165cb959d7..557b74ba22 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
@@ -109,6 +109,17 @@ public class ResultPartitionManager extends 
AbstractResultManager implements IRe
         }
     }
 
+    @Override
+    public void reportPartitionConsumed(JobId jobId, ResultSetId rsId, int 
partition) throws HyracksException {
+        try {
+            LOGGER.trace("Reporting partition consumed: JobId: {}:ResultSetId: 
{}:partition: {}", jobId, rsId,
+                    partition);
+            
ncs.getClusterController(jobId.getCcId()).reportResultPartitionConsumed(jobId, 
rsId, partition);
+        } catch (Exception e) {
+            throw HyracksException.create(e);
+        }
+    }
+
     @Override
     public void initializeResultPartitionReader(JobId jobId, ResultSetId 
resultSetId, int partition,
             IFrameWriter writer) throws HyracksException {
@@ -137,10 +148,12 @@ public class ResultPartitionManager extends 
AbstractResultManager implements IRe
     }
 
     @Override
-    public synchronized void removePartition(JobId jobId, ResultSetId 
resultSetId, int partition) {
+    public synchronized void removePartition(JobId jobId, ResultSetId 
resultSetId, int partition)
+            throws HyracksException {
         ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
         if (rsIdMap != null && rsIdMap.removePartition(jobId, resultSetId, 
partition)) {
             partitionResultStateMap.remove(jobId);
+            reportPartitionConsumed(jobId, resultSetId, partition);
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
index 3774530340..43042b3bd1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.partitions.ResultSetPartitionId;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
 import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
@@ -102,8 +103,9 @@ public class ResultPartitionReader {
                     final ResultSetPartitionId partitionId = 
resultState.getResultSetPartitionId();
                     
resultPartitionManager.removePartition(partitionId.getJobId(), 
partitionId.getResultSetId(),
                             partitionId.getPartition());
+
                 }
-            } catch (HyracksDataException e) {
+            } catch (HyracksException e) {
                 LOGGER.error("unexpected failure in partition reader clean 
up", e);
             }
         }

Reply via email to