This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 39a7e5728e8b2ea63129ddffb18683d2a64867cb Author: Peeyush Gupta <[email protected]> AuthorDate: Tue Oct 28 15:25:54 2025 -0700 [ASTERIXDB-3649][*DB] Async request API backward compatibility fixes - user model changes: no - storage format changes: no - interface changes: no Ext-ref: MB-60882 Change-Id: I5c1df629ccb2ad15c0b50a1ae65482c53fe51adc Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20529 Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Integration-Tests: Jenkins <[email protected]> --- .../api/http/server/NCQueryResultApiServlet.java | 5 +-- .../http/server/QueryServiceRequestParameters.java | 4 +- .../api/http/server/QueryServiceServlet.java | 4 ++ .../app/message/DiscardResultRequestMessage.java | 14 +------ .../app/message/DiscardResultResponseMessage.java | 44 ---------------------- .../asterix/app/result/fields/MetricsPrinter.java | 3 +- .../async-exhausted-result.1.async.sqlpp | 1 + .../async-failed/async-failed.1.async.sqlpp | 1 + .../async-json/async-json.1.async.sqlpp | 1 + .../async-json/async-json.6.async.sqlpp | 2 +- .../async-repeated/async-repeated.1.async.sqlpp | 1 + .../async-running/async-running.1.async.sqlpp | 1 + .../async-timeout/async.1.async.sqlpp | 1 + .../async/async.1.async.sqlpp | 1 + .../async-compilation-failed.1.async.sqlpp | 1 - .../async-exhausted-result.1.async.sqlpp | 1 - .../async-failed/async-failed.1.async.sqlpp | 1 - .../async-json/async-json.1.async.sqlpp | 1 - .../async-repeated/async-repeated.1.async.sqlpp | 1 - .../async-running/async-running.1.async.sqlpp | 1 - .../async-deferred/async/async.1.async.sqlpp | 1 - .../apache/asterix/common/api/IRequestTracker.java | 16 ++++++++ .../api/result/IResultPartitionManager.java | 2 +- .../hyracks/api/result/ResultDirectoryRecord.java | 6 +-- .../control/cc/result/IResultDirectoryService.java | 2 +- .../control/cc/result/ResultDirectoryService.java | 2 +- .../ReportResultPartitionWriteCompletionWork.java | 4 +- .../control/common/base/IClusterController.java | 2 +- .../hyracks/control/common/ipc/CCNCFunctions.java | 6 +-- .../common/ipc/ClusterControllerRemoteProxy.java | 2 +- .../control/nc/result/ResultPartitionManager.java | 2 +- .../control/nc/result/ResultPartitionWriter.java | 2 +- .../hyracks/control/nc/result/ResultState.java | 10 +++-- 33 files changed, 56 insertions(+), 90 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java index c9972fc9c3..90978fbe76 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java @@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentMap; import org.apache.asterix.app.message.DiscardResultRequestMessage; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.messaging.api.INCMessageBroker; -import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.asterix.utils.AsyncRequestsAPIUtil; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -48,10 +47,8 @@ public class NCQueryResultApiServlet extends QueryResultApiServlet { protected void discardResult(String requestId, JobId jobId, ResultSetId resultSetId) throws HyracksDataException { INCServiceContext serviceCtx = (INCServiceContext) appCtx.getServiceContext(); INCMessageBroker messageBroker = (INCMessageBroker) serviceCtx.getMessageBroker(); - MessageFuture messageFuture = messageBroker.registerMessageFuture(); - long futureId = messageFuture.getFutureId(); DiscardResultRequestMessage request = - new DiscardResultRequestMessage(serviceCtx.getNodeId(), futureId, jobId, resultSetId, requestId); + new DiscardResultRequestMessage(serviceCtx.getNodeId(), jobId, resultSetId, requestId); try { messageBroker.sendMessageToPrimaryCC(request); } catch (Exception e) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java index 49aba9cbfe..8b48405a66 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java @@ -476,6 +476,7 @@ public class QueryServiceRequestParameters { setHost(servlet.host(request)); setPath(servlet.servletPath(request)); setOptionalParams(optionalParams); + setIncludeHost(servlet.isOldApi(request)); try { if (useRequestParameters(request)) { setFromRequestParameters(request); @@ -541,8 +542,7 @@ public class QueryServiceRequestParameters { setClientType(parseIfExists(req, Parameter.CLIENT_TYPE.str(), valGetter, getClientType(), clientTypes::get)); setSQLCompatMode(parseBoolean(req, Parameter.SQL_COMPAT.str(), valGetter, isSQLCompatMode())); setSource(valGetter.apply(req, Parameter.SOURCE.str())); - setIncludeHost(parseBoolean(req, Parameter.INCLUDE_HOST.str(), valGetter, - isIncludeHost() || getMode() == ResultDelivery.DEFERRED)); + setIncludeHost(parseBoolean(req, Parameter.INCLUDE_HOST.str(), valGetter, isIncludeHost())); } protected void setExtraParams(JsonNode jsonRequest) throws HyracksDataException { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index c0de06abee..c4956bda6a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -454,4 +454,8 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { protected String getApplicationVersion() { return ApplicationConfigurator.getApplicationVersion(appCtx.getBuildProperties()); } + + protected boolean isOldApi(IServletRequest request) { + return true; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java index b7486c5acd..7ca897aa77 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java @@ -20,7 +20,6 @@ package org.apache.asterix.app.message; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; -import org.apache.asterix.messaging.CCMessageBroker; import org.apache.asterix.utils.AsyncRequestsAPIUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; @@ -36,12 +35,9 @@ public class DiscardResultRequestMessage implements ICcAddressedMessage { private final JobId jobId; private final ResultSetId resultSetId; private final String requestId; - private final long ncReqId; - public DiscardResultRequestMessage(String nodeId, long ncReqId, JobId jobId, ResultSetId resultSetId, - String requestId) { + public DiscardResultRequestMessage(String nodeId, JobId jobId, ResultSetId resultSetId, String requestId) { this.nodeId = nodeId; - this.ncReqId = ncReqId; this.jobId = jobId; this.resultSetId = resultSetId; this.requestId = requestId; @@ -49,14 +45,6 @@ public class DiscardResultRequestMessage implements ICcAddressedMessage { @Override public void handle(ICcApplicationContext appCtx) throws HyracksDataException { - CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); AsyncRequestsAPIUtil.discardResultPartitions((ICcApplicationContext) appCtx, jobId, resultSetId, requestId); - DiscardResultResponseMessage response = new DiscardResultResponseMessage(this.ncReqId); - try { - messageBroker.sendApplicationMessageToNC(response, nodeId); - } catch (Exception e) { - LOGGER.info("Failed to process request", e); - throw HyracksDataException.create(e); - } } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultResponseMessage.java deleted file mode 100644 index d938517b8b..0000000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultResponseMessage.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.message; - -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.messaging.api.INcAddressedMessage; -import org.apache.asterix.common.messaging.api.MessageFuture; -import org.apache.asterix.messaging.NCMessageBroker; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public class DiscardResultResponseMessage implements INcAddressedMessage { - - private static final long serialVersionUID = 1L; - private final long reqId; - - public DiscardResultResponseMessage(long reqId) { - this.reqId = reqId; - } - - @Override - public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); - MessageFuture future = mb.deregisterMessageFuture(reqId); - if (future != null) { - future.complete(this); - } - } -} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java index 952c897a7c..368c0dab53 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java @@ -59,6 +59,7 @@ public class MetricsPrinter implements IResponseFieldPrinter { } } + private static final Set<Metrics> ALL_METRICS = Set.of(Metrics.values()); public static final String FIELD_NAME = "metrics"; private final ResponseMetrics metrics; private final Charset resultCharset; @@ -71,7 +72,7 @@ public class MetricsPrinter implements IResponseFieldPrinter { public MetricsPrinter(ResponseMetrics metrics, Charset resultCharset, Set<Metrics> selectedMetrics) { this.metrics = metrics; this.resultCharset = resultCharset; - this.selectedMetrics = selectedMetrics == null ? Set.of(Metrics.values()) : selectedMetrics; + this.selectedMetrics = selectedMetrics == null ? ALL_METRICS : selectedMetrics; } @Override diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-exhausted-result/async-exhausted-result.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-exhausted-result/async-exhausted-result.1.async.sqlpp index b59b87a326..58c30edf58 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-exhausted-result/async-exhausted-result.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-exhausted-result/async-exhausted-result.1.async.sqlpp @@ -19,4 +19,5 @@ -- maxresultreads=1 -- handlevariable=status +-- param include-host=false select i, i * i as i2 from range(1, 10) i; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-failed/async-failed.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-failed/async-failed.1.async.sqlpp index 3277705352..b1aef31e27 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-failed/async-failed.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-failed/async-failed.1.async.sqlpp @@ -18,5 +18,6 @@ */ -- handlevariable=status +-- param include-host=false set `import-private-functions` `true`; select value inject_failure(sleep("result", 5000), true); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.1.async.sqlpp index 97e3367ecd..298f44c785 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.1.async.sqlpp @@ -18,4 +18,5 @@ */ -- handlevariable=status +-- param include-host=false select i, i * i as i2 from range(1, 5) i; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.6.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.6.async.sqlpp index 32ad877902..0e123c3e83 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.6.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.6.async.sqlpp @@ -17,7 +17,7 @@ * under the License. */ -- handlevariable=status - +-- param include-host=false use test; SET `compiler.sort.parallel` "true"; Select * from Test order by val; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-repeated/async-repeated.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-repeated/async-repeated.1.async.sqlpp index 60e88f4777..ff1feacec7 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-repeated/async-repeated.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-repeated/async-repeated.1.async.sqlpp @@ -19,4 +19,5 @@ -- maxresultreads=2 -- handlevariable=status +-- param include-host=false select i, i * i as i2 from range(1, 10) i; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-running/async-running.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-running/async-running.1.async.sqlpp index 3fed1e4e6d..d80bd5a776 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-running/async-running.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-running/async-running.1.async.sqlpp @@ -18,4 +18,5 @@ */ -- handlevariable=status +-- param include-host=false select value sleep("result", 5000); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-timeout/async.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-timeout/async.1.async.sqlpp index d77d375225..0a0e3ce6c8 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-timeout/async.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-timeout/async.1.async.sqlpp @@ -19,4 +19,5 @@ -- handlevariable=status -- param timeout=1s +-- param include-host=false select sleep("should not return", 60000); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async/async.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async/async.1.async.sqlpp index 5044611755..f1d4cb0308 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async/async.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async/async.1.async.sqlpp @@ -18,4 +18,5 @@ */ -- handlevariable=status +-- param include-host=false select i, i * i as i2 from range(1, 10) i; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-compilation-failed/async-compilation-failed.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-compilation-failed/async-compilation-failed.1.async.sqlpp index e63fd76058..66d97f2a81 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-compilation-failed/async-compilation-failed.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-compilation-failed/async-compilation-failed.1.async.sqlpp @@ -18,7 +18,6 @@ */ -- handlevariable=status --- param include-host=true select count(*) from gargel diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp index 48872a185e..f8ec2cfa3c 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp @@ -19,6 +19,5 @@ -- maxresultreads=1 -- handlevariable=status --- param include-host=true select i, i * i as i2 from range(1, 10) i; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.1.async.sqlpp index b95cca0115..d31a17a2ad 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-failed/async-failed.1.async.sqlpp @@ -18,7 +18,6 @@ */ -- handlevariable=status --- param include-host=true set `import-private-functions` `true`; select value inject_failure(sleep("result", 5000), true); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.1.async.sqlpp index 950fc1ae6e..e24253ab79 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-json/async-json.1.async.sqlpp @@ -18,6 +18,5 @@ */ -- handlevariable=status --- param include-host=true select i, i * i as i2 from range(1, 5) i; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp index 532844560a..8055915ec8 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp @@ -19,6 +19,5 @@ -- maxresultreads=2 -- handlevariable=status --- param include-host=true select i, i * i as i2 from range(1, 10) i; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.1.async.sqlpp index 80f4868da0..661887f3a9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-running/async-running.1.async.sqlpp @@ -18,6 +18,5 @@ */ -- handlevariable=status --- param include-host=true select value sleep("result", 5000); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.1.async.sqlpp index 56b8960d5c..1e18f66f59 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async/async.1.async.sqlpp @@ -18,6 +18,5 @@ */ -- handlevariable=status --- param include-host=true select i, i * i as i2 from range(1, 10) i; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java index b27a4e0b6e..8f3ff6d3af 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java @@ -94,10 +94,26 @@ public interface IRequestTracker extends IJobLifecycleListener { */ long getTotalNumberOfFailedRequests(); + /** + * Starts tracking an asynchronous or deferred request + * + * @param request + */ void trackAsyncOrDeferredRequest(IClientRequest request); + /** + * Removes an asynchronous or deferred request from tracking + * + * @param requestId + */ void removeAsyncOrDeferredRequest(String requestId); + /** + * Gets an asynchronous or deferred request by {@code requestId} + * + * @param requestId + * @return an Optional of the client request + */ Optional<IClientRequest> getAsyncOrDeferredRequest(String requestId); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java index 65ae2ab511..800f78cea9 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java @@ -30,7 +30,7 @@ public interface IResultPartitionManager extends IResultManager { void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions, IResultMetadata metadata, boolean emptyResult) throws HyracksException; - void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition, int resultCount) + void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition, long resultCount) throws HyracksException; void reportPartitionConsumed(JobId jobId, ResultSetId rsId, int partition) throws HyracksException; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java index b56d36c4e4..c7071c59c3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java @@ -40,7 +40,7 @@ public class ResultDirectoryRecord implements Serializable { private boolean empty; - private int resultCount; + private long resultCount; private String nodeId; @@ -71,7 +71,7 @@ public class ResultDirectoryRecord implements Serializable { this.empty = empty; } - public void setResultCount(int resultCount) { + public void setResultCount(long resultCount) { this.resultCount = resultCount; } @@ -103,7 +103,7 @@ public class ResultDirectoryRecord implements Serializable { status = Status.FAILED; } - public int getResultCount() { + public long getResultCount() { return resultCount; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java index 69a420a25c..0366774673 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java @@ -40,7 +40,7 @@ public interface IResultDirectoryService extends IJobLifecycleListener, IResultM boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress, String nodeId) throws HyracksDataException; - public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition, int resultCount) + public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition, long resultCount) throws HyracksDataException; public void reportResultPartitionConsumed(JobId jobId, ResultSetId rsId, int partition) throws HyracksDataException; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java index f61bafc80c..5111ceaa82 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java @@ -155,7 +155,7 @@ public class ResultDirectoryService extends AbstractResultManager implements IRe @Override public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition, - int resultCount) throws HyracksDataException { + long resultCount) throws HyracksDataException { ResultJobRecord djr = getNonNullResultJobRecord(jobId); djr.getDirectoryRecord(partition).writeEOS(); djr.getDirectoryRecord(partition).setResultCount(resultCount); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java index bc161f8d20..3657fad851 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java @@ -33,10 +33,10 @@ public class ReportResultPartitionWriteCompletionWork extends AbstractWork { private final int partition; - private final int resultCount; + private final long resultCount; public ReportResultPartitionWriteCompletionWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId, - int partition, int resultCount) { + int partition, long resultCount) { this.ccs = ccs; this.jobId = jobId; this.rsId = rsId; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java index 738185712c..874947dbba 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java @@ -71,7 +71,7 @@ public interface IClusterController { void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, IResultMetadata metadata, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress, String nodeId) throws Exception; - void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition, int resultCount) + void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition, long resultCount) throws Exception; void reportResultPartitionConsumed(JobId jobId, ResultSetId rsId, int partition) throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index 333f07cb00..8b68f91c23 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -670,10 +670,10 @@ public class CCNCFunctions { private final int partition; - private final int resultCount; + private final long resultCount; public ReportResultPartitionWriteCompletionFunction(JobId jobId, ResultSetId rsId, int partition, - int resultCount) { + long resultCount) { this.jobId = jobId; this.rsId = rsId; this.partition = partition; @@ -697,7 +697,7 @@ public class CCNCFunctions { return partition; } - public int getResultCount() { + public long getResultCount() { return resultCount; } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index 46b192484e..f4ac77df23 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -150,7 +150,7 @@ public class ClusterControllerRemoteProxy implements IClusterController { } @Override - public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition, int resultCount) + public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition, long resultCount) throws Exception { ReportResultPartitionWriteCompletionFunction fn = new ReportResultPartitionWriteCompletionFunction(jobId, rsId, partition, resultCount); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java index 877db8a83a..41cd31690b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java @@ -100,7 +100,7 @@ public class ResultPartitionManager extends AbstractResultManager implements IRe } @Override - public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition, int resultCount) + public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition, long resultCount) throws HyracksException { try { LOGGER.trace("Reporting partition write completion: JobId: {}:ResultSetId: {}:partition: {}", jobId, rsId, diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java index aec569704c..abec6bc868 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionWriter.java @@ -58,7 +58,7 @@ public class ResultPartitionWriter implements IFrameWriter { private boolean failed = false; - private int resultCount; + private long resultCount; public ResultPartitionWriter(IHyracksTaskContext ctx, IResultPartitionManager manager, JobId jobId, ResultSetId rsId, boolean asyncMode, IResultMetadata metadata, int partition, int nPartitions, diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java index 880afb7b60..ac2409611f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultState.java @@ -74,9 +74,7 @@ public class ResultState implements IStateObject { ResultState(ResultSetPartitionId resultSetPartitionId, boolean asyncMode, IIOManager ioManager, IWorkspaceFileFactory fileFactory, int frameSize, long maxReads) { - if (maxReads < -1) { - throw new IllegalArgumentException("maxReads must be >= -1"); - } + validateMaxReads(maxReads); this.maxReads = maxReads; this.resultSetPartitionId = resultSetPartitionId; this.asyncMode = asyncMode; @@ -92,6 +90,12 @@ public class ResultState implements IStateObject { fileHandle = null; } + private void validateMaxReads(long maxReads) { + if (maxReads != UNLIMITED_READS && maxReads < 1) { + throw new IllegalArgumentException("maxReads must be >= 1 or -1 for unlimited reads"); + } + } + public synchronized void open() { size = 0; persistentSize = 0;
