This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 487dbc8d7060ce8be537cc2c52e85abe58918136 Author: Ian Maxon <[email protected]> AuthorDate: Tue Nov 14 19:12:33 2023 -0800 [ASTERIX-3283][RT] Profile micro-ops and subplans - user model changes: yes - storage format changes: no - interface changes: yes Details: Introduce the idea of a self-profiling operator, and modify MetaOperator to be one of these. Also introduce a numbering scheme for identifying a MicroOp inside a MetaOp by its position within the pipeline, e.g. MicroOp position 2 in MetaOp ODID:3 would be ODID:3.2 Furthermore extend profiling into subplans, based on the MicroOp position. The subplan numbering is extended off of the MicroOp positioning on the subplan pipeline number and operator position. So for example Operator 3 with a Subplan in position 0 with a unary subplan with subplan MicroOp at position 3 would be ODID:3.0.0.3 Also introduce interfaces for some of the profiling methods rather than using concrete classes, and simplify the passing of parent stats between IProfiledOperatorNodePushable instances by using the input/output instead of passing it as a constructor parameter Change-Id: Ib266878bf05782506045abfadaa83b41f0f9598b Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17864 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ian Maxon <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../test/runtime/ProfiledExecutionTest.java | 3 - .../src/test/resources/runtimets/profiled.xml | 6 + .../non-unary-subplan.1.ddl.sqlpp} | 14 +- .../non-unary-subplan.2.update.sqlpp} | 15 +- .../non-unary-subplan.3.profile.sqlpp} | 18 +- .../profile/sleep/sleep.5.profile.sqlpp | 2 +- .../profile/full-scan/full-scan.3.regexjson | 18 ++ .../non-unary-subplan.3.regexjson | 241 +++++++++++++++++++++ .../results/profile/sleep/sleep.3.regexjson | 28 ++- .../results/profile/sleep/sleep.4.regexjson | 54 ++++- .../results/profile/sleep/sleep.5.regexjson | 60 ++++- .../common/ClosedRecordConstructorEvalFactory.java | 5 + .../DatasetStreamStatsOperatorDescriptor.java | 4 +- .../LogicalOperatorPrettyPrintVisitorJson.java | 118 ++++++++-- .../algebricks/core/jobgen/impl/JobBuilder.java | 34 ++- .../runtime/base/IPushRuntimeFactory.java | 1 + .../runtime/base/ProfiledPushRuntime.java | 95 ++++++++ .../meta/AlgebricksMetaOperatorDescriptor.java | 113 +++++++++- .../runtime/operators/meta/PipelineAssembler.java | 24 +- .../operators/meta/SubplanRuntimeFactory.java | 29 ++- .../api/dataflow/ISelfProfilingNodePushable.java | 15 +- .../api/dataflow/IStatsContainingNodePushable.java | 12 +- .../apache/hyracks/api/dataflow/ITimedWriter.java | 15 +- .../hyracks/api/dataflow/ProfiledFrameWriter.java | 59 ++--- .../api/dataflow/ProfiledOperatorNodePushable.java | 49 +++-- .../hyracks/api/job/profiling/IOperatorStats.java | 16 +- .../api/job/profiling/NoOpOperatorStats.java | 16 +- .../hyracks/api/job/profiling/OperatorStats.java | 45 ++-- .../runtime/SuperActivityOperatorNodePushable.java | 17 +- .../common/job/profiling/StatsCollector.java | 12 +- .../common/job/profiling/om/JobProfile.java | 4 +- .../common/job/profiling/om/TaskProfile.java | 6 +- ...aryOutputIntrospectingOperatorNodePushable.java | 14 +- 33 files changed, 948 insertions(+), 214 deletions(-) diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java index 7c945fc898..413f002061 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java @@ -30,9 +30,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -/** - * Runs the cluster state runtime tests with the storage parallelism. - */ @RunWith(Parameterized.class) public class ProfiledExecutionTest { protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-single.conf"; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml index a404639539..d3664d4cc1 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml @@ -44,5 +44,11 @@ <output-dir compare="Text">sleep</output-dir> </compilation-unit> </test-case> + <test-case FilePath="profile"> + <compilation-unit name="non-unary-subplan"> + <parameter name="profile" value="timings" type="string"/> + <output-dir compare="Text">non-unary-subplan</output-dir> + </compilation-unit> + </test-case> </test-group> </test-suite> \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.1.ddl.sqlpp similarity index 83% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.1.ddl.sqlpp index 1358a62144..c1ce4a377a 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.1.ddl.sqlpp @@ -17,11 +17,11 @@ * under the License. */ --- compareunorderedarray=true -USE test; +drop dataverse test if exists; +create dataverse test; +use test; -SELECT count(*) AS customers, city -FROM Customers c -WHERE c.age <65 -GROUP BY c.address.city -ORDER BY sleep(city,1666); \ No newline at end of file +create type TType as open +{ id: bigint }; + +create dataset TData (TType) primary key id; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.2.update.sqlpp similarity index 83% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.2.update.sqlpp index 1358a62144..3ce9554d65 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.2.update.sqlpp @@ -17,11 +17,12 @@ * under the License. */ --- compareunorderedarray=true -USE test; +use test; -SELECT count(*) AS customers, city -FROM Customers c -WHERE c.age <65 -GROUP BY c.address.city -ORDER BY sleep(city,1666); \ No newline at end of file +insert into TData ( [ +{'id':1, 'x':1, 'f':19}, +{'id':2, 'x':2, 'f':12}, +{'id':3, 'x':1, 'f':10}, +{'id':4, 'x':2, 'f':17}, +{'id':5, 'x':1, 'f':12} +]); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.3.profile.sqlpp similarity index 83% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.3.profile.sqlpp index 1358a62144..4fa986c122 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.3.profile.sqlpp @@ -18,10 +18,16 @@ */ -- compareunorderedarray=true -USE test; +use test; -SELECT count(*) AS customers, city -FROM Customers c -WHERE c.age <65 -GROUP BY c.address.city -ORDER BY sleep(city,1666); \ No newline at end of file +select value +array_sum(( + select value a.f + from g as p + union all + select value a.f + from g as w +)) +from TData as a +group by a.x as x group as g +; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp index 1358a62144..6c35376332 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp @@ -24,4 +24,4 @@ SELECT count(*) AS customers, city FROM Customers c WHERE c.age <65 GROUP BY c.address.city -ORDER BY sleep(city,1666); \ No newline at end of file +ORDER BY sleep(city,1700); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson index 88107b0c07..03900f38a9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson @@ -65,6 +65,24 @@ "attempt": "R{[0-9]+}", "partition-send-profile": [], "counters": [ + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, { "name": "R{.+}", "run-time": "R{[0-9.]+}", diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson new file mode 100644 index 0000000000..e59f09565b --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson @@ -0,0 +1,241 @@ +{ + "job-id": "R{[A-Z0-9.:]+}", + "create-time": "R{[0-9.]+}", + "start-time": "R{[0-9.]+}", + "queued-time": "R{.+}", + "end-time": "R{[0-9.]+}", + "counters": [], + "joblets": [ + { + "node-id": "R{.+}", + "counters": [], + "tasks": [ + { + "activity-id": "R{[A-Z0-9.:]+}", + "partition": "R{[0-9]+}", + "attempt": "R{[0-9]+}", + "partition-send-profile": [], + "counters": [ + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}" + } + ] + }, + { + "activity-id": "R{[A-Z0-9.:]+}", + "partition": "R{[0-9]+}", + "attempt": "R{[0-9]+}", + "partition-send-profile": [], + "counters": [ + { + "name": "R{ANID:ODID:[0-9]:0\\.1 - MicroOp Subplan(?:.|\n)+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}" + } + ] + }, + { + "activity-id": "R{[A-Z0-9.:]+}", + "partition": "R{[0-9]+}", + "attempt": "R{[0-9]+}", + "partition-send-profile": [ + { + "partition-id": { + "job-id": "R{[A-Z0-9.:]+}", + "connector-id": "R{[A-Z0-9.:]+}", + "sender-index": "R{[0-9]+}", + "receiver-index": "R{[0-9]+}" + }, + "open-time": "R{[0-9]+}", + "close-time": "R{[0-9]+}", + "offset": "R{[0-9]+}", + "frame-times": [ + 0 + ], + "resolution": 1 + } + ], + "counters": [ + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + } + ] + } + ] + }] +} \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson index bfb8c62f07..e6d1c0a7b6 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson @@ -16,6 +16,15 @@ "attempt": "R{[0-9]+}", "partition-send-profile": [], "counters": [ + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, { "name": "R{.+}", "run-time": "R{[0-9.]+}", @@ -32,6 +41,15 @@ "min-tuple-size": "R{[0-9.]+}", "max-tuple-size": "R{[0-9.]+}" }, + { + "name": "R{.+}", + "run-time": "R{5.+}", + "runtime-id": "R{.+}", + "cardinality-out": 10, + "avg-tuple-size": 38, + "min-tuple-size": 38, + "max-tuple-size": 38 + }, { "name": "R{.+}", "run-time": "R{[0-9.]+}", @@ -43,12 +61,12 @@ }, { "name": "R{.+}", - "run-time": "R{5.+}", + "run-time": "R{[0-9.]+}", "runtime-id": "R{.+}", - "cardinality-out": 10, - "avg-tuple-size": 25, - "min-tuple-size": 25, - "max-tuple-size": 25 + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" } ] } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson index 012133ee2e..e3e76477d7 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson @@ -80,6 +80,15 @@ "name": "R{.+}", "run-time": "R{[0-9.]+}", "runtime-id": "R{.+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" } ] }, @@ -93,10 +102,24 @@ "name": "R{.+}", "run-time": "R{[0-9.]+}", "runtime-id": "R{.+}", - "cardinality-out": "R{[0-9.]+}", - "avg-tuple-size": "R{[0-9.]+}", - "min-tuple-size": "R{[0-9.]+}", - "max-tuple-size": "R{[0-9.]+}" + "cardinality-out": 10, + "avg-tuple-size": 140, + "min-tuple-size": 137, + "max-tuple-size": 151 + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": 5, + "avg-tuple-size": 145, + "min-tuple-size": 142, + "max-tuple-size": 151 + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}" }, { "name": "R{.+}", @@ -109,19 +132,32 @@ "min-tuple-size": "R{[0-9.]+}", "max-tuple-size": "R{[0-9.]+}" }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": 5, + "avg-tuple-size": 16, + "min-tuple-size": 14, + "max-tuple-size": 19 + }, { "name": "R{.+}", "run-time": "R{5.+}", "runtime-id": "R{.+}", - "cardinality-out": "R{[0-9.]+}", - "avg-tuple-size": "R{[0-9.]+}", - "min-tuple-size": "R{[0-9.]+}", - "max-tuple-size": "R{[0-9.]+}" + "cardinality-out": 5, + "avg-tuple-size": 161, + "min-tuple-size": 156, + "max-tuple-size": 170 }, { "name": "R{.+}", "run-time": "R{[0-9.]+}", - "runtime-id": "R{.+}" + "runtime-id": "R{.+}", + "cardinality-out": 1, + "avg-tuple-size": 0, + "min-tuple-size": 0, + "max-tuple-size": 0 } ] } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson index b024312fd7..98d7930438 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson @@ -58,7 +58,16 @@ "attempt": "R{[0-9]+}", "partition-send-profile": [], "counters": [ - { + { + "name": "R{.+}", + "run-time": "R{5.+}", + "runtime-id": "R{.+}", + "cardinality-out": 3, + "avg-tuple-size": 75, + "min-tuple-size": 67, + "max-tuple-size": 82 + }, + { "name": "R{.+}", "run-time": "R{[0-9.]+}", "runtime-id": "R{.+}", @@ -69,7 +78,25 @@ }, { "name": "R{.+}", - "run-time": "R{5.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", "runtime-id": "R{.+}", "cardinality-out": "R{[0-9.]+}", "avg-tuple-size": "R{[0-9.]+}", @@ -140,6 +167,33 @@ "attempt": "R{[0-9]+}", "partition-send-profile": [], "counters": [ + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, + { + "name": "R{.+}", + "run-time": "R{[0-9.]+}", + "runtime-id": "R{.+}", + "cardinality-out": "R{[0-9.]+}", + "avg-tuple-size": "R{[0-9.]+}", + "min-tuple-size": "R{[0-9.]+}", + "max-tuple-size": "R{[0-9.]+}" + }, { "name": "R{.+}", "run-time": "R{[0-9.]+}", @@ -162,7 +216,7 @@ }, { "name": "R{.+}", - "run-time": "R{[0-9].+}", + "run-time": "R{[0-9.]+}", "runtime-id": "R{.+}", "cardinality-out": "R{[0-9.]+}", "avg-tuple-size": "R{[0-9.]+}", diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java index 2347fa5aca..f22f5da0ab 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java @@ -45,6 +45,11 @@ public class ClosedRecordConstructorEvalFactory implements IScalarEvaluatorFacto this.recType = recType; } + @Override + public String toString() { + return "ClosedRecordConstructor"; + } + @Override public IScalarEvaluator createScalarEvaluator(IEvaluatorContext ctx) throws HyracksDataException { int n = args.length / 2; diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java index ef9e75b310..f9d75b510c 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java @@ -19,6 +19,8 @@ package org.apache.asterix.runtime.operators; +import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID; + import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -83,7 +85,7 @@ public final class DatasetStreamStatsOperatorDescriptor extends AbstractSingleAc writer.open(); IStatsCollector coll = ctx.getStatsCollector(); if (coll != null) { - coll.add(new OperatorStats(operatorName)); + coll.add(new OperatorStats(operatorName, INVALID_ODID)); } INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext(); indexStats = new HashMap<>(); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java index 32c1464208..5c532d3665 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -82,7 +83,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperat import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; -import org.apache.hyracks.api.dataflow.ActivityId; +import org.apache.hyracks.api.dataflow.OperatorDescriptorId; import org.apache.hyracks.api.exceptions.ErrorCode; import com.fasterxml.jackson.core.JsonFactory; @@ -107,7 +108,7 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat private static final String OPTIMIZER_ESTIMATES = "optimizer-estimates"; private final Map<AbstractLogicalOperator, String> operatorIdentity = new HashMap<>(); private Map<Object, String> log2odid = Collections.emptyMap(); - private Map<String, ProfileInfo> profile = Collections.emptyMap(); + private Map<String, OperatorProfile> profile = Collections.emptyMap(); private final IdCounter idCounter = new IdCounter(); private final JsonGenerator jsonGenerator; @@ -155,15 +156,88 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat } } - private class ProfileInfo { - Map<Integer, Pair<Double, Double>> activities; + private class ExtendedActivityId { + private final OperatorDescriptorId odId; + private final int id; + private final int microId; + private final int subPipe; + private final int subId; - ProfileInfo() { + ExtendedActivityId(String str) { + if (str.startsWith("ANID:")) { + str = str.substring(5); + int idIdx = str.lastIndexOf(':'); + this.odId = OperatorDescriptorId.parse(str.substring(0, idIdx)); + String[] parts = str.substring(idIdx + 1).split("\\."); + this.id = Integer.parseInt(parts[0]); + if (parts.length >= 2) { + this.microId = Integer.parseInt(parts[1]); + } else { + this.microId = -1; + } + if (parts.length >= 4) { + this.subPipe = Integer.parseInt(parts[2]); + this.subId = Integer.parseInt(parts[3]); + } else { + this.subPipe = -1; + this.subId = -1; + } + } else { + throw new IllegalArgumentException("Unable to parse: " + str); + } + } + + @Override + public int hashCode() { + return Objects.hash(values()); + } + + @Override + public boolean equals(Object o) { + return (o instanceof ExtendedActivityId) && Objects.equals(((ExtendedActivityId) o).values(), values()); + } + + private List<?> values() { + return List.of(odId, id, microId, subPipe, subId); + } + + @Override + public String toString() { + return "ANID:" + odId + ":" + getLocalId(); + } + + private void catenateId(StringBuilder sb, int i) { + if (sb.length() == 0) { + sb.append(i); + return; + } + sb.append("."); + sb.append(i); + } + + public String getLocalId() { + StringBuilder sb = new StringBuilder(); + catenateId(sb, odId.getId()); + if (microId > 0) { + catenateId(sb, microId); + } + if (subId > 0) { + catenateId(sb, subPipe); + catenateId(sb, subId); + } + return sb.toString(); + } + } + + private class OperatorProfile { + Map<String, Pair<Double, Double>> activities; + + OperatorProfile() { activities = new HashMap<>(); } - void visit(int id, double time) { - Pair<Double, Double> times = activities.computeIfAbsent(id, i -> new Pair(time, time)); + void updateOperator(String extendedOpId, double time) { + Pair<Double, Double> times = activities.computeIfAbsent(extendedOpId, i -> new Pair(time, time)); if (times.getFirst() > time) { times.setFirst(time); } @@ -173,26 +247,27 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat } } - private static ActivityId acIdFromName(String name) { + private ExtendedActivityId acIdFromName(String name) { String[] parts = name.split(" - "); - return ActivityId.parse(parts[0]); + return new ExtendedActivityId(parts[0]); } - Map<String, ProfileInfo> processProfile(ObjectNode profile) { - Map<String, ProfileInfo> profiledOps = new HashMap<>(); + Map<String, OperatorProfile> processProfile(ObjectNode profile) { + Map<String, OperatorProfile> profiledOps = new HashMap<>(); for (JsonNode joblet : profile.get("joblets")) { for (JsonNode task : joblet.get("tasks")) { for (JsonNode counters : task.get("counters")) { - ProfileInfo info = - profiledOps.computeIfAbsent(counters.get("runtime-id").asText(), i -> new ProfileInfo()); - info.visit(acIdFromName(counters.get("name").asText()).getLocalId(), + OperatorProfile info = profiledOps.computeIfAbsent(counters.get("runtime-id").asText(), + i -> new OperatorProfile()); + info.updateOperator(acIdFromName(counters.get("name").asText()).getLocalId(), counters.get("run-time").asDouble()); } for (JsonNode partition : task.get("partition-send-profile")) { String id = partition.get("partition-id").get("connector-id").asText(); - ProfileInfo info = profiledOps.computeIfAbsent(id, i -> new ProfileInfo()); + OperatorProfile info = profiledOps.computeIfAbsent(id, i -> new OperatorProfile()); //CDIDs are unique - info.visit(0, partition.get("close-time").asDouble() - partition.get("open-time").asDouble()); + info.updateOperator("0", + partition.get("close-time").asDouble() - partition.get("open-time").asDouble()); } } } @@ -268,15 +343,16 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat String od = log2odid.get(op); if (od != null) { jsonGenerator.writeStringField("runtime-id", od); - ProfileInfo info = profile.get(od); + OperatorProfile info = profile.get(od); if (info != null) { if (info.activities.size() == 1) { - jsonGenerator.writeNumberField("min-time", info.activities.get(0).first); - jsonGenerator.writeNumberField("max-time", info.activities.get(0).second); + Pair<Double, Double> minMax = info.activities.values().iterator().next(); + jsonGenerator.writeNumberField("min-time", minMax.first); + jsonGenerator.writeNumberField("max-time", minMax.second); } else { jsonGenerator.writeObjectFieldStart("times"); - for (Map.Entry<Integer, Pair<Double, Double>> ac : info.activities.entrySet()) { - jsonGenerator.writeObjectFieldStart(ac.getKey().toString()); + for (Map.Entry<String, Pair<Double, Double>> ac : info.activities.entrySet()) { + jsonGenerator.writeObjectFieldStart(ac.getKey()); jsonGenerator.writeNumberField("min-time", ac.getValue().first); jsonGenerator.writeNumberField("max-time", ac.getValue().second); jsonGenerator.writeEndObject(); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java index d8bf19073c..99ef8d00c5 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; @@ -38,6 +39,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; @@ -148,11 +150,39 @@ public class JobBuilder implements IHyracksJobBuilder { hyracksOps.put(op, opDesc); } + private String getExtendedOdidForMetaOp(ILogicalOperator op, int k) { + String base = metaAsterixOps.get(k).getOperatorId().toString(); + Pair<IPushRuntimeFactory, RecordDescriptor> fact = microOps.get(op); + List<Pair<IPushRuntimeFactory, RecordDescriptor>> metaOpPipeline = metaAsterixOpSkeletons.get(k); + int pos = metaOpPipeline.indexOf(fact); + return base + "." + pos; + } + + private void getExtendedOdidForSubplanOp(SubplanOperator op, Map<ILogicalOperator, String> log2phys) { + String baseId = getExtendedOdidForMetaOp(op, algebraicOpBelongingToMetaAsterixOp.get(op)); + op.getNestedPlans().forEach(plan -> plan.getRoots() + .forEach(root -> getExtendedOdidForOperator(baseId, root.getValue(), log2phys, 0))); + } + + private int getExtendedOdidForOperator(String baseId, ILogicalOperator op, Map<ILogicalOperator, String> log2phys, + int input) { + List<Mutable<ILogicalOperator>> inputs = op.getInputs(); + List<Integer> paths = new ArrayList<>(inputs.size()); + for (int i = 0; i < inputs.size(); i++) { + ILogicalOperator nextOp = inputs.get(i).getValue(); + paths.add(i, getExtendedOdidForOperator(baseId, nextOp, log2phys, i + input)); + } + int lPath = paths.size() > 0 ? Collections.max(paths) : 0; + log2phys.put(op, baseId + "." + input + "." + lPath); + return lPath + 1; + } + public Map<Object, String> getLogical2PhysicalMap() { Map<ILogicalOperator, String> mergedOperatorMap = new HashMap<>(); hyracksOps.forEach(((k, v) -> mergedOperatorMap.put(k, v.getOperatorId().toString()))); - algebraicOpBelongingToMetaAsterixOp - .forEach((k, v) -> mergedOperatorMap.put(k, metaAsterixOps.get(v).getOperatorId().toString())); + algebraicOpBelongingToMetaAsterixOp.forEach((k, v) -> mergedOperatorMap.put(k, getExtendedOdidForMetaOp(k, v))); + microOps.keySet().stream().filter(op -> op instanceof SubplanOperator) + .forEach(op -> getExtendedOdidForSubplanOp((SubplanOperator) op, mergedOperatorMap)); connectors.forEach((k, v) -> mergedOperatorMap.put(k, v.getFirst().getConnectorId().toString())); return Collections.unmodifiableMap(mergedOperatorMap); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java index 82b6f9c249..b1bd46b063 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.SourceLocation; public interface IPushRuntimeFactory extends Serializable { + IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException; default void setSourceLocation(SourceLocation sourceLoc) { diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java new file mode 100644 index 0000000000..354f172c1c --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.runtime.base; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.dataflow.ITimedWriter; +import org.apache.hyracks.api.dataflow.ProfiledFrameWriter; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.profiling.IOperatorStats; + +public class ProfiledPushRuntime extends ProfiledFrameWriter implements IPushRuntime { + + private final IPushRuntime wrapped; + private final IOperatorStats stats; + + private final boolean last; + + private final Map<Integer, ITimedWriter> outputs; + + public ProfiledPushRuntime(IPushRuntime push, IOperatorStats stats, boolean last) { + super(push); + outputs = new HashMap<>(); + this.wrapped = push; + this.stats = stats; + this.last = last; + } + + @Override + public void close() throws HyracksDataException { + super.close(); + long ownTime = getTotalTime(); + //for micro union all. accumulate the time of each input into the counter. + //then, on input 0, subtract the output from the accumulated time. + if (!last) { + stats.getTimeCounter().update(ownTime); + return; + } + ownTime += stats.getTimeCounter().get(); + for (ITimedWriter w : outputs.values()) { + ownTime -= w.getTotalTime(); + } + stats.getTimeCounter().set(ownTime); + } + + public IOperatorStats getStats() { + return stats; + } + + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + if (writer instanceof ITimedWriter) { + ITimedWriter wrapper = (ITimedWriter) writer; + wrapper.setUpstreamStats(stats); + outputs.put(index, wrapper); + } + wrapped.setOutputFrameWriter(index, writer, recordDesc); + } + + @Override + public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) { + wrapped.setInputRecordDescriptor(index, recordDescriptor); + } + + public static IPushRuntime time(IPushRuntime push, IOperatorStats stats, boolean last) throws HyracksDataException { + if (!(push instanceof ProfiledPushRuntime)) { + return new ProfiledPushRuntime(push, stats, last); + } else { + return push; + } + } + + public static IPushRuntime time(IPushRuntime push, IOperatorStats stats) throws HyracksDataException { + return time(push, stats, true); + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java index b3fef7f117..2a3fa7ee19 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java @@ -19,26 +19,34 @@ package org.apache.hyracks.algebricks.runtime.operators.meta; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.ITimedWriter; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.api.job.profiling.IOperatorStats; +import org.apache.hyracks.api.job.profiling.NoOpOperatorStats; +import org.apache.hyracks.api.job.profiling.OperatorStats; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable; import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; import com.fasterxml.jackson.databind.node.ObjectNode; public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 3L; // array of factories for building the local runtime pipeline private final AlgebricksPipeline pipeline; @@ -85,6 +93,68 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper } } + private static String makeStatName(String base, String name, int pos, int input, int subPlan, int subPos) { + StringBuilder sb = new StringBuilder(); + sb.append(base); + sb.append("."); + sb.append(pos); + if (subPlan >= 0) { + sb.append("."); + sb.append(subPlan); + sb.append("."); + sb.append(subPos); + sb.append(" - Subplan "); + } else { + sb.append(" - MicroOp "); + } + sb.append(name); + if (input >= 0) { + sb.append(" input ["); + sb.append(input); + sb.append("] "); + } + return sb.toString(); + } + + private static String makeId(String base, int id, int subPlan, int subPos) { + return base + "." + id + (subPlan >= 0 ? "." + subPlan : "") + (subPos >= 0 ? "." + subPos : ""); + } + + private static IOperatorStats makeStatForRuntimeFact(IPushRuntimeFactory factory, String base, String baseId, + int pos, int subPlan, int subPos) { + return new OperatorStats(makeStatName(base, factory.toString(), pos, -1, subPlan, subPos), + makeId(baseId, pos, subPlan, subPos)); + } + + public static Map<IPushRuntimeFactory, IOperatorStats> makeMicroOpStats(AlgebricksPipeline pipe, + IOperatorStats outerStats) { + Map<IPushRuntimeFactory, IOperatorStats> microOpStats = new HashMap<>(); + String baseName = outerStats.getName().split(" - ")[0]; + String baseId = outerStats.getOperatorId(); + List<SubplanRuntimeFactory> subplans = new ArrayList<>(); + for (int i = 0; i < pipe.getRuntimeFactories().length; i++) { + IPushRuntimeFactory fact = pipe.getRuntimeFactories()[i]; + //TODO: don't use instanceof + if (fact instanceof SubplanRuntimeFactory) { + SubplanRuntimeFactory subplanFact = (SubplanRuntimeFactory) fact; + subplans.add(subplanFact); + List<AlgebricksPipeline> pipelines = subplanFact.getPipelines(); + for (AlgebricksPipeline p : pipelines) { + IPushRuntimeFactory[] subplanFactories = p.getRuntimeFactories(); + for (int j = subplanFactories.length - 1; j > 0; j--) { + microOpStats.put(subplanFactories[j], makeStatForRuntimeFact(subplanFactories[j], baseName, + baseId, i, pipelines.indexOf(p), j)); + } + } + } + microOpStats.put(fact, makeStatForRuntimeFact(fact, baseName, baseId, i, -1, -1)); + } + for (SubplanRuntimeFactory sub : subplans) { + sub.setStats(microOpStats); + } + return microOpStats; + } + private class SourcePushRuntime extends AbstractUnaryOutputSourceOperatorNodePushable { private final IHyracksTaskContext ctx; @@ -99,7 +169,7 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null; PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null, pipelineOutputRecordDescriptor); - startOfPipeline = pa.assemblePipeline(writer, ctx); + startOfPipeline = pa.assemblePipeline(writer, ctx, new HashMap<>()); HyracksDataException exception = null; try { startOfPipeline.open(); @@ -126,16 +196,18 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper public String getDisplayName() { return "Empty Tuple Source"; } + } private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx, final IRecordDescriptorProvider recordDescProvider) { - return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { + return new AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable() { private IFrameWriter startOfPipeline; private boolean opened = false; + private IOperatorStats parentStats = NoOpOperatorStats.INSTANCE; + private Map<IPushRuntimeFactory, IOperatorStats> microOpStats = new HashMap<>(); - @Override public void open() throws HyracksDataException { if (startOfPipeline == null) { RecordDescriptor pipelineOutputRecordDescriptor = @@ -144,7 +216,7 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper .getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0); PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor); - startOfPipeline = pa.assemblePipeline(writer, ctx); + startOfPipeline = pa.assemblePipeline(writer, ctx, microOpStats); } opened = true; startOfPipeline.open(); @@ -174,10 +246,39 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper startOfPipeline.flush(); } + @Override + public void deinitialize() throws HyracksDataException { + super.deinitialize(); + } + @Override public String toString() { return AlgebricksMetaOperatorDescriptor.this.toString(); } + + @Override + public void addStats(IOperatorStats stats) throws HyracksDataException { + microOpStats = makeMicroOpStats(pipeline, stats); + for (IOperatorStats stat : microOpStats.values()) { + ctx.getStatsCollector().add(stat); + } + } + + @Override + public void setUpstreamStats(IOperatorStats stats) { + parentStats = stats; + } + + @Override + public long getTotalTime() { + return startOfPipeline instanceof ITimedWriter ? ((ITimedWriter) startOfPipeline).getTotalTime() : 0; + } + + @Override + public IOperatorStats getStats() { + IPushRuntimeFactory[] facts = pipeline.getRuntimeFactories(); + return microOpStats.getOrDefault(facts[facts.length - 1], NoOpOperatorStats.INSTANCE); + } }; } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java index 858fcfa566..0ce2ff523a 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java @@ -25,6 +25,8 @@ import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime; +import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.EnforceFrameWriter; @@ -32,6 +34,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.profiling.IOperatorStats; public class PipelineAssembler { @@ -55,6 +58,11 @@ public class PipelineAssembler { } public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException { + return assemblePipeline(writer, ctx, new HashMap<>()); + } + + public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx, + Map<IPushRuntimeFactory, IOperatorStats> microOpStats) throws HyracksDataException { // should enforce protocol boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME); @@ -67,7 +75,21 @@ public class PipelineAssembler { IPushRuntimeFactory runtimeFactory = runtimeFactories[i]; IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx); for (int j = 0; j < newRuntimes.length; j++) { - if (enforce) { + //ETS is wrapped externally, and doesn't need the micro-op wrapper since it isn't a pipeline + //we also want to avoid any instances of NoOp stats in the pipeline that snuck in somehow + boolean shouldProfile = !(runtimeFactory instanceof EmptyTupleSourceRuntimeFactory) && profile + && microOpStats.containsKey(runtimeFactory); + if (shouldProfile) { + ProfiledPushRuntime profiled; + if (j == 0) { + profiled = (ProfiledPushRuntime) ProfiledPushRuntime.time(newRuntimes[j], + microOpStats.get(runtimeFactory)); + } else { + profiled = (ProfiledPushRuntime) ProfiledPushRuntime.time(newRuntimes[j], + microOpStats.get(runtimeFactory), false); + } + newRuntimes[j] = profiled; + } else if (enforce && !profile) { newRuntimes[j] = EnforcePushRuntime.enforce(newRuntimes[j]); } if (i == runtimeFactories.length - 1) { diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java index 349e50ff62..7feca3c7ab 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java @@ -20,7 +20,9 @@ package org.apache.hyracks.algebricks.runtime.operators.meta; import java.io.DataOutput; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; @@ -32,16 +34,19 @@ import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneO import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.ProfiledFrameWriter; import org.apache.hyracks.api.dataflow.value.IMissingWriter; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.profiling.IOperatorStats; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 3L; private final List<AlgebricksPipeline> pipelines; @@ -51,6 +56,8 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto private final IMissingWriterFactory[] missingWriterFactories; + private final Map<IPushRuntimeFactory, IOperatorStats> stats; + public SubplanRuntimeFactory(List<AlgebricksPipeline> pipelines, IMissingWriterFactory[] missingWriterFactories, RecordDescriptor inputRecordDesc, RecordDescriptor outputRecordDesc, int[] projectionList) { super(projectionList); @@ -61,6 +68,7 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto if (projectionList != null) { throw new NotImplementedException(); } + this.stats = new HashMap<>(); } @Override @@ -78,6 +86,14 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto return sb.toString(); } + public List<AlgebricksPipeline> getPipelines() { + return pipelines; + } + + public void setStats(Map<IPushRuntimeFactory, IOperatorStats> stats) { + this.stats.putAll(stats); + } + @Override public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException { @@ -92,8 +108,11 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto boolean first; + boolean profile; + SubplanPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { this.ctx = ctx; + this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME); this.first = true; IMissingWriter[] missingWriters = new IMissingWriter[missingWriterFactories.length]; @@ -114,6 +133,11 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto if (i == 0) { // primary pipeline outputWriter = new TupleOuterProduct(pipelineLastRecordDescriptor, missingWriters); + //this is necessary to track the output of the last operator to the outer product, + //i.e. the last real operator in pipeline 0 of the subplan + if (profile) { + outputWriter = new ProfiledFrameWriter(outputWriter); + } outputRecordDescriptor = SubplanRuntimeFactory.this.outputRecordDesc; } else { // secondary pipeline @@ -127,7 +151,8 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto } PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor); - startOfPipelines[i] = (NestedTupleSourceRuntime) pa.assemblePipeline(outputWriter, ctx); + IFrameWriter head = pa.assemblePipeline(outputWriter, ctx, stats); + startOfPipelines[i] = (NestedTupleSourceRuntime) head; pipelineAssemblers[i] = pa; } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ISelfProfilingNodePushable.java similarity index 71% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp copy to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ISelfProfilingNodePushable.java index 1358a62144..7402179122 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ISelfProfilingNodePushable.java @@ -17,11 +17,12 @@ * under the License. */ --- compareunorderedarray=true -USE test; +package org.apache.hyracks.api.dataflow; -SELECT count(*) AS customers, city -FROM Customers c -WHERE c.age <65 -GROUP BY c.address.city -ORDER BY sleep(city,1666); \ No newline at end of file +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.profiling.IOperatorStats; + +public interface ISelfProfilingNodePushable extends IStatsContainingNodePushable { + void addStats(IOperatorStats stats) throws HyracksDataException; + +} diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IStatsContainingNodePushable.java similarity index 81% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp copy to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IStatsContainingNodePushable.java index 1358a62144..96a3ae9857 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IStatsContainingNodePushable.java @@ -16,12 +16,10 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.hyracks.api.dataflow; --- compareunorderedarray=true -USE test; +import org.apache.hyracks.api.job.profiling.IOperatorStats; -SELECT count(*) AS customers, city -FROM Customers c -WHERE c.age <65 -GROUP BY c.address.city -ORDER BY sleep(city,1666); \ No newline at end of file +public interface IStatsContainingNodePushable { + IOperatorStats getStats(); +} diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java similarity index 74% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp copy to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java index 1358a62144..7b0f8c8a0c 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java @@ -16,12 +16,13 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.hyracks.api.dataflow; --- compareunorderedarray=true -USE test; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.job.profiling.IOperatorStats; -SELECT count(*) AS customers, city -FROM Customers c -WHERE c.age <65 -GROUP BY c.address.city -ORDER BY sleep(city,1666); \ No newline at end of file +public interface ITimedWriter extends IFrameWriter { + void setUpstreamStats(IOperatorStats stats); + + long getTotalTime(); +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java index dc53bcae05..cfb0e7b0e5 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.api.dataflow; +import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID; + import java.nio.ByteBuffer; import org.apache.hyracks.api.com.job.profiling.counters.Counter; @@ -28,30 +30,33 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.profiling.IOperatorStats; import org.apache.hyracks.api.job.profiling.IStatsCollector; +import org.apache.hyracks.api.job.profiling.NoOpOperatorStats; import org.apache.hyracks.api.job.profiling.OperatorStats; import org.apache.hyracks.api.job.profiling.counters.ICounter; import org.apache.hyracks.api.util.HyracksRunnable; import org.apache.hyracks.api.util.HyracksThrowingConsumer; import org.apache.hyracks.util.IntSerDeUtils; -public class ProfiledFrameWriter implements IFrameWriter { +public class ProfiledFrameWriter implements ITimedWriter { // The downstream data consumer of this writer. private final IFrameWriter writer; - private final ICounter tupleCounter; - private final IOperatorStats parentStats; + protected IOperatorStats upstreamStats = NoOpOperatorStats.INSTANCE; private int minSz = Integer.MAX_VALUE; private int maxSz = -1; private long avgSz; private ICounter totalTime; - public ProfiledFrameWriter(IFrameWriter writer, IOperatorStats parentStats) { + public ProfiledFrameWriter(IFrameWriter writer) { this.writer = writer; - this.parentStats = parentStats; - this.tupleCounter = parentStats != null ? parentStats.getTupleCounter() : null; this.totalTime = new Counter("totalTime"); } + @Override + public void setUpstreamStats(IOperatorStats stats) { + this.upstreamStats = stats; + } + public static void timeMethod(HyracksRunnable r, ICounter c) throws HyracksDataException { long nt = 0; try { @@ -80,25 +85,24 @@ public class ProfiledFrameWriter implements IFrameWriter { private void updateTupleStats(ByteBuffer buffer) { int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit()); int tupleCount = IntSerDeUtils.getInt(buffer.array(), tupleCountOffset); - if (tupleCounter != null) { - long prevCount = tupleCounter.get(); - for (int i = 0; i < tupleCount; i++) { - int tupleLen = getTupleLength(i, tupleCountOffset, buffer); - if (maxSz < tupleLen) { - maxSz = tupleLen; - } - if (minSz > tupleLen) { - minSz = tupleLen; - } - long prev = avgSz * prevCount; - avgSz = (prev + tupleLen) / (prevCount + 1); - prevCount++; + ICounter tupleCounter = upstreamStats.getTupleCounter(); + long prevCount = tupleCounter.get(); + for (int i = 0; i < tupleCount; i++) { + int tupleLen = getTupleLength(i, tupleCountOffset, buffer); + if (maxSz < tupleLen) { + maxSz = tupleLen; } - parentStats.getMaxTupleSz().set(maxSz); - parentStats.getMinTupleSz().set(minSz); - parentStats.getAverageTupleSz().set(avgSz); - tupleCounter.update(tupleCount); + if (minSz > tupleLen) { + minSz = tupleLen; + } + long prev = avgSz * prevCount; + avgSz = (prev + tupleLen) / (prevCount + 1); + prevCount++; } + upstreamStats.getMaxTupleSz().set(maxSz); + upstreamStats.getMinTupleSz().set(minSz); + upstreamStats.getAverageTupleSz().set(avgSz); + tupleCounter.update(tupleCount); } @Override @@ -140,14 +144,15 @@ public class ProfiledFrameWriter implements IFrameWriter { throws HyracksDataException { if (!(writer instanceof ProfiledFrameWriter)) { IStatsCollector statsCollector = ctx.getStatsCollector(); - IOperatorStats stats = new OperatorStats(name); + IOperatorStats stats = new OperatorStats(name, INVALID_ODID); statsCollector.add(stats); - return new ProfiledFrameWriter(writer, null); - - } else + return new ProfiledFrameWriter(writer); + } else { return writer; + } } + @Override public long getTotalTime() { return totalTime.get(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java index bde5611855..1984d8e07f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java @@ -32,20 +32,16 @@ import org.apache.hyracks.api.job.profiling.OperatorStats; import org.apache.hyracks.api.job.profiling.counters.ICounter; import org.apache.hyracks.api.rewriter.runtime.SuperActivityOperatorNodePushable; -public class ProfiledOperatorNodePushable implements IOperatorNodePushable { +public class ProfiledOperatorNodePushable implements IOperatorNodePushable, IStatsContainingNodePushable { private final IOperatorNodePushable op; - private final Map<Integer, ProfiledFrameWriter> inputs; - private final Map<Integer, ProfiledOperatorNodePushable> parents; - private final Map<Integer, ProfiledFrameWriter> outputs; + private final Map<Integer, ITimedWriter> inputs; + private final Map<Integer, ITimedWriter> outputs; private final IOperatorStats stats; private final ICounter totalTime; - ProfiledOperatorNodePushable(IOperatorNodePushable op, IOperatorStats stats, - ProfiledOperatorNodePushable parentOp) { + ProfiledOperatorNodePushable(IOperatorNodePushable op, IOperatorStats stats) { this.stats = stats; - this.parents = new HashMap<>(); - parents.put(0, parentOp); this.op = op; inputs = new HashMap<>(); outputs = new HashMap<>(); @@ -60,10 +56,10 @@ public class ProfiledOperatorNodePushable implements IOperatorNodePushable { @Override public void deinitialize() throws HyracksDataException { long ownTime = totalTime.get(); - for (ProfiledFrameWriter i : inputs.values()) { + for (ITimedWriter i : inputs.values()) { ownTime += i.getTotalTime(); } - for (ProfiledFrameWriter w : outputs.values()) { + for (ITimedWriter w : outputs.values()) { ownTime -= w.getTotalTime(); } op.deinitialize(); @@ -78,8 +74,13 @@ public class ProfiledOperatorNodePushable implements IOperatorNodePushable { @Override public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) throws HyracksDataException { - if (writer instanceof ProfiledFrameWriter) { - ProfiledFrameWriter wrapper = (ProfiledFrameWriter) writer; + if (writer instanceof ITimedWriter) { + ITimedWriter wrapper = (ITimedWriter) writer; + if (op instanceof ISelfProfilingNodePushable) { + wrapper.setUpstreamStats(((ISelfProfilingNodePushable) op).getStats()); + } else { + wrapper.setUpstreamStats(stats); + } outputs.put(index, wrapper); } op.setOutputFrameWriter(index, writer, recordDesc); @@ -88,8 +89,7 @@ public class ProfiledOperatorNodePushable implements IOperatorNodePushable { @Override public IFrameWriter getInputFrameWriter(int index) { if (inputs.get(index) == null) { - IOperatorStats parentStats = parents.get(index) == null ? null : parents.get(index).getStats(); - ProfiledFrameWriter pfw = new ProfiledFrameWriter(op.getInputFrameWriter(index), parentStats); + ProfiledFrameWriter pfw = new ProfiledFrameWriter(op.getInputFrameWriter(index)); inputs.put(index, pfw); return pfw; } else { @@ -102,25 +102,26 @@ public class ProfiledOperatorNodePushable implements IOperatorNodePushable { return op.getDisplayName(); } - public void addParent(int index, ProfiledOperatorNodePushable parent) { - parents.put(index, parent); - } - public IOperatorStats getStats() { return stats; } - public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx, ActivityId acId, - ProfiledOperatorNodePushable source) throws HyracksDataException { + public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx, ActivityId acId) + throws HyracksDataException { String name = acId.toString() + " - " + op.getDisplayName(); IStatsCollector statsCollector = ctx.getStatsCollector(); - IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId()); - statsCollector.add(stats); + IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId().toString()); + if (!(op instanceof ISelfProfilingNodePushable)) { + statsCollector.add(stats); + } if (op instanceof IIntrospectingOperator) { ((IIntrospectingOperator) op).setOperatorStats(stats); } + if (op instanceof ISelfProfilingNodePushable) { + ((ISelfProfilingNodePushable) op).addStats(stats); + } if (!(op instanceof ProfiledOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) { - return new ProfiledOperatorNodePushable(op, stats, source); + return new ProfiledOperatorNodePushable(op, stats); } return op; } @@ -129,7 +130,7 @@ public class ProfiledOperatorNodePushable implements IOperatorNodePushable { throws HyracksDataException { String name = acId.toString() + " - " + op.getDisplayName(); IStatsCollector statsCollector = ctx.getStatsCollector(); - IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId()); + IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId().toString()); if (op instanceof IIntrospectingOperator) { ((IIntrospectingOperator) op).setOperatorStats(stats); statsCollector.add(stats); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java index 26b3a58679..7770c4fd9d 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java @@ -18,10 +18,11 @@ */ package org.apache.hyracks.api.job.profiling; +import java.io.DataInput; +import java.io.IOException; import java.io.Serializable; import java.util.Map; -import org.apache.hyracks.api.dataflow.OperatorDescriptorId; import org.apache.hyracks.api.io.IWritable; import org.apache.hyracks.api.job.profiling.counters.ICounter; @@ -86,11 +87,22 @@ public interface IOperatorStats extends IWritable, Serializable { ICounter getBytesWritten(); - OperatorDescriptorId getId(); + String getOperatorId(); void updateIndexesStats(Map<String, IndexStats> indexesStats); Map<String, IndexStats> getIndexesStats(); void updateFrom(IOperatorStats stats); + + static IOperatorStats create(DataInput input) throws IOException { + String name = input.readUTF(); + if (NoOpOperatorStats.NOOP_NAME.equals(name)) { + return NoOpOperatorStats.INSTANCE; + } + String operatorId = input.readUTF(); + OperatorStats operatorStats = new OperatorStats(name, operatorId); + operatorStats.readFields(input); + return operatorStats; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java index 4c86a15d40..d427d14542 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java @@ -24,14 +24,15 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; -import org.apache.hyracks.api.dataflow.OperatorDescriptorId; import org.apache.hyracks.api.job.profiling.counters.ICounter; public class NoOpOperatorStats implements IOperatorStats { - private static final long serialVersionUID = 9055940822300360135L; + private static final long serialVersionUID = 9055940222300360256L; public static final NoOpOperatorStats INSTANCE = new NoOpOperatorStats(); + public static final String INVALID_ODID = "ODID:-1"; + public static final String NOOP_NAME = "NoOp"; private static final ICounter NOOP_COUNTER = new ICounter() { private static final long serialVersionUID = 1L; @@ -57,21 +58,20 @@ public class NoOpOperatorStats implements IOperatorStats { } }; - private static final OperatorDescriptorId INVALID_ODID = new OperatorDescriptorId(-1); - @Override public void writeFields(DataOutput output) throws IOException { - + output.writeUTF(NOOP_NAME); + output.writeUTF(INVALID_ODID); } @Override public void readFields(DataInput input) throws IOException { - + // nothing } @Override public String getName() { - return ""; + return NOOP_NAME; } @Override @@ -130,7 +130,7 @@ public class NoOpOperatorStats implements IOperatorStats { } @Override - public OperatorDescriptorId getId() { + public String getOperatorId() { return INVALID_ODID; } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java index c9f08e0bd5..412b788dba 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java @@ -25,14 +25,13 @@ import java.util.HashMap; import java.util.Map; import org.apache.hyracks.api.com.job.profiling.counters.Counter; -import org.apache.hyracks.api.dataflow.OperatorDescriptorId; import org.apache.hyracks.api.job.profiling.counters.ICounter; public class OperatorStats implements IOperatorStats { - private static final long serialVersionUID = 6401830963367567169L; - + private static final long serialVersionUID = 6401830963361567126L; public final String operatorName; - public OperatorDescriptorId id; + + public final String operatorId; public final ICounter tupleCounter; public final ICounter timeCounter; public final ICounter pageReads; @@ -48,12 +47,12 @@ public class OperatorStats implements IOperatorStats { //TODO: this is quickly becoming gross it should just be a map where the value is obliged to be a Counter - public OperatorStats(String operatorName, OperatorDescriptorId id) { + public OperatorStats(String operatorName, String operatorId) { if (operatorName == null || operatorName.isEmpty()) { throw new IllegalArgumentException("operatorName must not be null or empty"); } this.operatorName = operatorName; - this.id = id; + this.operatorId = operatorId; tupleCounter = new Counter("tupleCounter"); timeCounter = new Counter("timeCounter"); pageReads = new Counter("diskIoCounter"); @@ -69,17 +68,6 @@ public class OperatorStats implements IOperatorStats { indexesStats = new HashMap<>(); } - public OperatorStats(String operatorName) { - this(operatorName, new OperatorDescriptorId(-1)); - } - - public static IOperatorStats create(DataInput input) throws IOException { - String name = input.readUTF(); - OperatorStats operatorStats = new OperatorStats(name); - operatorStats.readFields(input); - return operatorStats; - } - @Override public String getName() { return operatorName; @@ -141,8 +129,8 @@ public class OperatorStats implements IOperatorStats { } @Override - public OperatorDescriptorId getId() { - return id; + public String getOperatorId() { + return operatorId; } @Override @@ -178,7 +166,7 @@ public class OperatorStats implements IOperatorStats { @Override public void writeFields(DataOutput output) throws IOException { output.writeUTF(operatorName); - id.writeFields(output); + output.writeUTF(operatorId); output.writeLong(tupleCounter.get()); output.writeLong(timeCounter.get()); output.writeLong(pageReads.get()); @@ -195,7 +183,6 @@ public class OperatorStats implements IOperatorStats { @Override public void readFields(DataInput input) throws IOException { - id = OperatorDescriptorId.create(input); tupleCounter.set(input.readLong()); timeCounter.set(input.readLong()); pageReads.set(input.readLong()); @@ -229,13 +216,13 @@ public class OperatorStats implements IOperatorStats { @Override public String toString() { - return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"" + tupleCounter.getName() + "\": " - + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + timeCounter.get() + ", \"" - + coldReadCounter.getName() + "\": " + coldReadCounter.get() + avgTupleSz.getName() + "\": " - + avgTupleSz.get() + ", \"" + minTupleSz.getName() + "\": " + minTupleSz.get() + ", \"" - + minTupleSz.getName() + "\": " + timeCounter.get() + ", \"" + inputTupleCounter.getName() + "\": " - + bytesRead.get() + ", \"" + bytesRead.getName() + "\": " + bytesWritten.get() + ", \"" - + bytesWritten.getName() + "\": " + inputTupleCounter.get() + ", \"" + level.getName() + "\": " - + level.get() + ", \"indexStats\": \"" + indexesStats + "\" }"; + return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"id\": \"" + operatorId + "\", " + "\"" + + tupleCounter.getName() + "\": " + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + + timeCounter.get() + ", \"" + coldReadCounter.getName() + "\": " + coldReadCounter.get() + + avgTupleSz.getName() + "\": " + avgTupleSz.get() + ", \"" + minTupleSz.getName() + "\": " + + minTupleSz.get() + ", \"" + minTupleSz.getName() + "\": " + timeCounter.get() + ", \"" + + inputTupleCounter.getName() + "\": " + bytesRead.get() + ", \"" + bytesRead.getName() + "\": " + + bytesWritten.get() + ", \"" + bytesWritten.getName() + "\": " + inputTupleCounter.get() + ", \"" + + level.getName() + "\": " + level.get() + ", \"indexStats\": \"" + indexesStats + "\" }"; } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index 4261c1e22a..17f5cb1ea0 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -109,11 +109,11 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable */ Set<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> sources = new HashSet<>(); for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) { - IOperatorNodePushable opPushable = null; + IOperatorNodePushable opPushable; if (profile) { IOperatorNodePushable wrapped = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions); - opPushable = ProfiledOperatorNodePushable.time(wrapped, ctx, entry.getKey(), null); + opPushable = ProfiledOperatorNodePushable.time(wrapped, ctx, entry.getKey()); } else { opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions); ProfiledOperatorNodePushable.onlyAddStats(opPushable, ctx, entry.getKey()); @@ -147,12 +147,7 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable if (profile) { IOperatorNodePushable wrapped = channel.getRight().getLeft().createPushRuntime(ctx, recordDescProvider, partition, nPartitions); - if (sourceOp instanceof ProfiledOperatorNodePushable) { - destOp = ProfiledOperatorNodePushable.time(wrapped, ctx, destId, - (ProfiledOperatorNodePushable) sourceOp); - } else { - destOp = ProfiledOperatorNodePushable.time(wrapped, ctx, destId, null); - } + destOp = ProfiledOperatorNodePushable.time(wrapped, ctx, destId); } else { destOp = channel.getRight().getLeft().createPushRuntime(ctx, recordDescProvider, partition, nPartitions); @@ -160,12 +155,6 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable } operatorNodePushablesBFSOrder.add(destOp); operatorNodePushables.put(destId, destOp); - } else if (profile) { - if (destOp instanceof ProfiledOperatorNodePushable - && sourceOp instanceof ProfiledOperatorNodePushable) { - ((ProfiledOperatorNodePushable) destOp).addParent(inputChannel, - (ProfiledOperatorNodePushable) sourceOp); - } } /* diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java index 26c664a444..7cfbda05c8 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.control.common.job.profiling; +import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -31,7 +33,7 @@ import org.apache.hyracks.api.job.profiling.NoOpOperatorStats; import org.apache.hyracks.api.job.profiling.OperatorStats; public class StatsCollector implements IStatsCollector { - private static final long serialVersionUID = 6858817639895434574L; + private static final long serialVersionUID = 6858817639895434379L; private final Map<String, IOperatorStats> operatorStatsMap = new LinkedHashMap<>(); @@ -61,7 +63,7 @@ public class StatsCollector implements IStatsCollector { @Override public IOperatorStats getAggregatedStats() { - IOperatorStats aggregatedStats = new OperatorStats("aggregated"); + IOperatorStats aggregatedStats = new OperatorStats("aggregated", INVALID_ODID); for (IOperatorStats stats : operatorStatsMap.values()) { aggregatedStats.getInputTupleCounter().update(stats.getInputTupleCounter().get()); aggregatedStats.getTimeCounter().update(stats.getTimeCounter().get()); @@ -74,8 +76,8 @@ public class StatsCollector implements IStatsCollector { @Override public void writeFields(DataOutput output) throws IOException { output.writeInt(operatorStatsMap.size()); - for (IOperatorStats operatorStats : operatorStatsMap.values()) { - operatorStats.writeFields(output); + for (IOperatorStats stats : operatorStatsMap.values()) { + stats.writeFields(output); } } @@ -83,7 +85,7 @@ public class StatsCollector implements IStatsCollector { public void readFields(DataInput input) throws IOException { int operatorCount = input.readInt(); for (int i = 0; i < operatorCount; i++) { - IOperatorStats opStats = OperatorStats.create(input); + IOperatorStats opStats = IOperatorStats.create(input); operatorStatsMap.put(opStats.getName(), opStats); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java index 871d2bc77b..40bc1ba0e9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.control.common.job.profiling.om; +import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -184,7 +186,7 @@ public class JobProfile extends AbstractProfile { } IOperatorStats opOutStats = outStats[i]; if (opOutStats == null) { - opOutStats = new OperatorStats(operatorName); + opOutStats = new OperatorStats(operatorName, INVALID_ODID); outStats[i] = opOutStats; } opOutStats.updateFrom(opTaskStats); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java index 546360a589..84376f69bd 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.control.common.job.profiling.om; +import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -130,8 +132,8 @@ public class TaskProfile extends AbstractProfile { jpe.put("name", key); jpe.put("run-time", Double .parseDouble(new DecimalFormat("#.####").format((double) value.getTimeCounter().get() / 1000000))); - if (value.getId().getId() >= 0) { - jpe.put("runtime-id", value.getId().toString()); + if (!value.getOperatorId().equals(INVALID_ODID)) { + jpe.put("runtime-id", value.getOperatorId()); } if (value.getPageReads().get() > 0) { jpe.put("pages-read", value.getPageReads().get()); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable.java similarity index 68% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp copy to hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable.java index 1358a62144..7f0f5a9795 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable.java @@ -16,12 +16,12 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.hyracks.dataflow.std.base; --- compareunorderedarray=true -USE test; +import org.apache.hyracks.api.dataflow.ISelfProfilingNodePushable; +import org.apache.hyracks.api.dataflow.ITimedWriter; -SELECT count(*) AS customers, city -FROM Customers c -WHERE c.age <65 -GROUP BY c.address.city -ORDER BY sleep(city,1666); \ No newline at end of file +public abstract class AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable + extends AbstractUnaryInputUnaryOutputOperatorNodePushable implements ISelfProfilingNodePushable, ITimedWriter { + +}
