This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit ecfc3329e1cc8a53f2ecd323e2a6be37890a5f1f Author: Peeyush Gupta <[email protected]> AuthorDate: Tue Oct 28 13:36:29 2025 -0700 [ASTERIXDB-3649][*DB] Improve async request API - user model changes: no - storage format changes: no - interface changes: no Details: Add a field to denote if the partitions returned as part of the status should be read in order. Ext-ref: MB-60882 Change-Id: If32b1020c41e7d70e2af07275b4f6f9beb8ad184 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20527 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../org/apache/asterix/translator/ResultMetadata.java | 9 +++++++++ .../asterix/api/http/server/QueryStatusApiServlet.java | 5 +++-- .../apache/asterix/app/result/JobResultCallback.java | 1 + .../asterix/app/result/fields/PartitionInfoPrinter.java | 9 +++++++-- .../async-json/async-json.4.ddl.sqlpp | 17 +++++++++++++---- .../async-json/async-json.5.update.sqlpp | 13 +++++++++---- .../async-json/async-json.6.async.sqlpp | 8 ++++---- .../async-json/async-json.7.pollget.http | 8 ++++---- .../async-json/async-json.8.get.http | 7 ++----- .../async-exhausted-result.2.regexjson | 1 + .../async-json/async-json.2.regexjson | 1 + .../async-json/async-json.6.ignore | 0 .../async-json.7.regexjson} | 3 ++- .../async-json/async-json.8.json | 4 ++++ .../async-repeated/async-repeated.2.regexjson | 1 + .../async-running/async-running.3.regexjson | 1 + .../async-deferred-improved/async/async.2.regexjson | 1 + .../operators/physical/AbstractExchangePOperator.java | 6 ++++++ .../physical/AbstractRangeExchangePOperator.java | 7 +++++++ ...PartialBroadcastRangeIntersectExchangePOperator.java | 7 +++++++ .../PushNestedOrderByUnderPreSortedGroupByRule.java | 2 +- .../org/apache/hyracks/api/job/HyracksJobProperty.java | 3 ++- .../org/apache/hyracks/api/result/ResultJobRecord.java | 8 +++++++- .../control/cc/result/ResultDirectoryService.java | 7 ++++++- 24 files changed, 99 insertions(+), 30 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java index de6026b25a..66b2503d71 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java @@ -47,6 +47,7 @@ public class ResultMetadata implements IResultMetadata { private long compileTime; private long createTime; private long endTime; + private boolean resultSetOrdered; public ResultMetadata(SessionConfig.OutputFormat format) { this.format = format; @@ -194,6 +195,14 @@ public class ResultMetadata implements IResultMetadata { this.endTime = endTime; } + public boolean isResultSetOrdered() { + return resultSetOrdered; + } + + public void setResultSetOrdered(boolean resultSetOrdered) { + this.resultSetOrdered = resultSetOrdered; + } + @Override public String toString() { return "ResultMetadata{" + "format=" + format + ", jobDuration=" + jobDuration + ", processedObjects=" diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java index 52597ad9d9..9052f8f655 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java @@ -110,10 +110,11 @@ public class QueryStatusApiServlet extends AbstractQueryApiServlet { } printer.addResultPrinter(new ResultHandlePrinter(resHandle)); if (uriMode) { + ResultMetadata metadata = (ResultMetadata) resultReader.getMetadata(); printer.addResultPrinter(new ResultCountPrinter( ((ResultMetadata) (resultReader.getResultSetReader().getResultMetadata())).getResultCount())); - printer.addResultPrinter( - new PartitionInfoPrinter(resultReader.getResultSetReader().getResultRecords(), resHandle)); + printer.addResultPrinter(new PartitionInfoPrinter(resultReader.getResultSetReader().getResultRecords(), + resHandle, metadata.isResultSetOrdered())); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java index a941100c6f..6e25be4942 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java @@ -67,6 +67,7 @@ public class JobResultCallback implements IJobResultCallback { final ResultMetadata metadata = (ResultMetadata) resultSetMetaData.getMetadata(); metadata.setJobDuration(resultJobRecord.getJobDuration()); metadata.setResultCount(resultJobRecord.getResultCount()); + metadata.setResultSetOrdered(resultJobRecord.isResultSetOrdered()); aggregateJobStats(jobId, metadata); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/PartitionInfoPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/PartitionInfoPrinter.java index 8e8d7f2aa7..194f33e494 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/PartitionInfoPrinter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/PartitionInfoPrinter.java @@ -29,13 +29,16 @@ public class PartitionInfoPrinter implements IResponseFieldPrinter { public static final String FIELD_NAME = "partitions"; public static final String HANDLE_FIELD_NAME = "handle"; public static final String RESULT_COUNT_FIELD_NAME = "resultCount"; + public static final String RESULTSET_ORDERED_FIELD_NAME = "resultSetOrdered"; private final ResultDirectoryRecord[] resultRecords; private final String handlePrefix; + private final boolean resultSetOrdered; - public PartitionInfoPrinter(ResultDirectoryRecord[] resultRecords, String handlePrefix) { + public PartitionInfoPrinter(ResultDirectoryRecord[] resultRecords, String handlePrefix, boolean resultSetOrdered) { this.resultRecords = resultRecords; this.handlePrefix = handlePrefix; + this.resultSetOrdered = resultSetOrdered; } @Override @@ -54,7 +57,9 @@ public class PartitionInfoPrinter implements IResponseFieldPrinter { pw.print(","); } } - pw.print("]"); + pw.print("],"); + pw.print("\n\t"); + ResultUtil.printField(pw, RESULTSET_ORDERED_FIELD_NAME, resultSetOrdered, false); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.4.ddl.sqlpp similarity index 81% copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.4.ddl.sqlpp index d8bf5cba2d..b0d22c240a 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.4.ddl.sqlpp @@ -16,8 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.job; -public enum HyracksJobProperty implements IJobProperty { - JOB_KIND -} +drop dataverse test if exists; +create dataverse test; + +use test; + +create type TestType as + closed { + id : integer, + val : double +}; + +create dataset Test(TestType) primary key id; + diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.5.update.sqlpp similarity index 81% copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.5.update.sqlpp index d8bf5cba2d..31abd4889f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.5.update.sqlpp @@ -16,8 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.job; -public enum HyracksJobProperty implements IJobProperty { - JOB_KIND -} +use test; + +UPSERT INTO Test { "id": 1, "val": 2.5 }; + +UPSERT INTO Test { "id": 2, "val": 3.5 }; + +UPSERT INTO Test { "id": 3, "val": 4.5 }; + +UPSERT INTO Test { "id": 4, "val": 5.5 }; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.6.async.sqlpp similarity index 88% copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.6.async.sqlpp index d8bf5cba2d..32ad877902 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.6.async.sqlpp @@ -16,8 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.job; +-- handlevariable=status -public enum HyracksJobProperty implements IJobProperty { - JOB_KIND -} +use test; +SET `compiler.sort.parallel` "true"; +Select * from Test order by val; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.7.pollget.http similarity index 88% copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.7.pollget.http index d8bf5cba2d..d10aed92c1 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.7.pollget.http @@ -16,8 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.job; -public enum HyracksJobProperty implements IJobProperty { - JOB_KIND -} +-- polltimeoutsecs=10 +-- handlevariable=result + +$status diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.8.get.http similarity index 88% copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.8.get.http index d8bf5cba2d..6496a4b61b 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/async-json/async-json.8.get.http @@ -16,8 +16,5 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.job; - -public enum HyracksJobProperty implements IJobProperty { - JOB_KIND -} +-- extractresult=true +$result diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-exhausted-result/async-exhausted-result.2.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-exhausted-result/async-exhausted-result.2.regexjson index 0ed7a67c58..fbcb2c62f2 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-exhausted-result/async-exhausted-result.2.regexjson +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-exhausted-result/async-exhausted-result.2.regexjson @@ -3,6 +3,7 @@ "handle": "R{.*}", "resultCount": 10, "partitions": "R{.*}", + "resultSetOrdered": false, "metrics": "R{.*}", "createdAt": "R{.*}" } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-json/async-json.2.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-json/async-json.2.regexjson index c6c829ba60..a967c365b7 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-json/async-json.2.regexjson +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-json/async-json.2.regexjson @@ -3,6 +3,7 @@ "handle": "R{.*}", "resultCount": 5, "partitions": "R{.*}", + "resultSetOrdered": false, "metrics": "R{.*}", "createdAt": "R{.*}" } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-json/async-json.6.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-json/async-json.6.ignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-exhausted-result/async-exhausted-result.2.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-json/async-json.7.regexjson similarity index 70% copy from asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-exhausted-result/async-exhausted-result.2.regexjson copy to asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-json/async-json.7.regexjson index 0ed7a67c58..3d4a86ccb3 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-exhausted-result/async-exhausted-result.2.regexjson +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-json/async-json.7.regexjson @@ -1,8 +1,9 @@ { "status":"success", "handle": "R{.*}", - "resultCount": 10, + "resultCount": 4, "partitions": "R{.*}", + "resultSetOrdered": true, "metrics": "R{.*}", "createdAt": "R{.*}" } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-json/async-json.8.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-json/async-json.8.json new file mode 100644 index 0000000000..5ca6c1efc2 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-json/async-json.8.json @@ -0,0 +1,4 @@ +{ "Test": { "id": 1, "val": 2.5 } } +{ "Test": { "id": 2, "val": 3.5 } } +{ "Test": { "id": 3, "val": 4.5 } } +{ "Test": { "id": 4, "val": 5.5 } } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-repeated/async-repeated.2.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-repeated/async-repeated.2.regexjson index 0ed7a67c58..fbcb2c62f2 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-repeated/async-repeated.2.regexjson +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-repeated/async-repeated.2.regexjson @@ -3,6 +3,7 @@ "handle": "R{.*}", "resultCount": 10, "partitions": "R{.*}", + "resultSetOrdered": false, "metrics": "R{.*}", "createdAt": "R{.*}" } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-running/async-running.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-running/async-running.3.regexjson index 9e724fd104..bc20bf9fe0 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-running/async-running.3.regexjson +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async-running/async-running.3.regexjson @@ -3,6 +3,7 @@ "handle": "R{.*}", "resultCount": 1, "partitions": "R{.*}", + "resultSetOrdered": false, "metrics": "R{.*}", "createdAt": "R{.*}" } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async/async.2.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async/async.2.regexjson index 0ed7a67c58..fbcb2c62f2 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async/async.2.regexjson +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/async/async.2.regexjson @@ -3,6 +3,7 @@ "handle": "R{.*}", "resultCount": 10, "partitions": "R{.*}", + "resultSetOrdered": false, "metrics": "R{.*}", "createdAt": "R{.*}" } \ No newline at end of file diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java index 6f8d5e9d54..c844f31d81 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java @@ -27,6 +27,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSch import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; +import org.apache.hyracks.api.job.JobSpecification; public abstract class AbstractExchangePOperator extends AbstractPhysicalOperator { @Override @@ -38,6 +39,11 @@ public abstract class AbstractExchangePOperator extends AbstractPhysicalOperator builder.contributeConnectorWithTargetConstraint(op, connPair.first, connPair.second); ILogicalOperator src = op.getInputs().get(0).getValue(); builder.contributeGraphEdge(src, 0, op, 0); + setJobSpecAnnotation(builder.getJobSpec()); + } + + protected void setJobSpecAnnotation(JobSpecification spec) { + // No-op } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractRangeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractRangeExchangePOperator.java index 53f3aafc63..03a49cf729 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractRangeExchangePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractRangeExchangePOperator.java @@ -36,6 +36,8 @@ import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.job.HyracksJobProperty; +import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.common.data.partition.range.DynamicRangeMapSupplier; import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap; import org.apache.hyracks.dataflow.common.data.partition.range.RangeMapSupplier; @@ -132,4 +134,9 @@ abstract class AbstractRangeExchangePOperator extends AbstractExchangePOperator } return new Triple<>(sortFields, comps, nkcf); } + + @Override + protected void setJobSpecAnnotation(JobSpecification spec) { + spec.setProperty(HyracksJobProperty.RESULT_SET_ORDERED, true); + } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PartialBroadcastRangeIntersectExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PartialBroadcastRangeIntersectExchangePOperator.java index 4097101dce..e9de45f74d 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PartialBroadcastRangeIntersectExchangePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PartialBroadcastRangeIntersectExchangePOperator.java @@ -46,7 +46,9 @@ import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputerFactory; +import org.apache.hyracks.api.job.HyracksJobProperty; import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; +import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangeIntersectPartitionComputerFactory; import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap; import org.apache.hyracks.dataflow.common.data.partition.range.StaticRangeMapSupplier; @@ -136,6 +138,11 @@ public class PartialBroadcastRangeIntersectExchangePOperator extends AbstractExc return new Triple<>(startFields, endFields, comps); } + @Override + protected void setJobSpecAnnotation(JobSpecification spec) { + spec.setProperty(HyracksJobProperty.RESULT_SET_ORDERED, true); + } + @Override public String toString() { return getOperatorTag().toString() + " " + intervalFields + (rangeMap != null ? " RANGE_MAP:" + rangeMap : ""); diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java index 344e103fb1..db855b3232 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java @@ -114,7 +114,7 @@ public class PushNestedOrderByUnderPreSortedGroupByRule implements IAlgebraicRew // ++k; } // sort2.setSortColumns(sortColumns); - sort2.computeDeliveredProperties(order2, null); + sort2.computeDeliveredProperties(order2, context); // remove order1 ILogicalOperator underOrder1 = order1.getInputs().get(0).getValue(); opRef2.setValue(underOrder1); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java index d8bf5cba2d..5d6075f1e4 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java @@ -19,5 +19,6 @@ package org.apache.hyracks.api.job; public enum HyracksJobProperty implements IJobProperty { - JOB_KIND + JOB_KIND, + RESULT_SET_ORDERED } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java index edda92fedb..dd568d0beb 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java @@ -87,11 +87,13 @@ public class ResultJobRecord implements IResultStateRecord { private ResultSetId rsId; private ResultSetMetaData resultSetMetaData; private long resultCount; + private boolean resultSetOrdered; - public ResultJobRecord() { + public ResultJobRecord(boolean resultSetOrdered) { this.timestamp = System.nanoTime(); this.status = new Status(); this.resultCount = 0; + this.resultSetOrdered = resultSetOrdered; } private void updateState(State newStatus) { @@ -149,6 +151,10 @@ public class ResultJobRecord implements IResultStateRecord { return status; } + public boolean isResultSetOrdered() { + return resultSetOrdered; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java index 6a1d28fc4d..f61bafc80c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java @@ -31,6 +31,7 @@ import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.HyracksJobProperty; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; @@ -83,7 +84,11 @@ public class ResultDirectoryService extends AbstractResultManager implements IRe if (jobResultLocations.get(jobId) != null) { throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId); } - jobResultLocations.put(jobId, new JobResultInfo(new ResultJobRecord(), null)); + Boolean partitionsOrdered = (Boolean) spec.getProperty(HyracksJobProperty.RESULT_SET_ORDERED); + if (partitionsOrdered == null) { + partitionsOrdered = false; + } + jobResultLocations.put(jobId, new JobResultInfo(new ResultJobRecord(partitionsOrdered), null)); } @Override
