This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 7285518ca0d7492c7393ed906cb2fe041aaf714a Author: Ian Maxon <[email protected]> AuthorDate: Thu Jul 10 16:17:38 2025 -0700 [ASTERIXDB-3628][RT] Correct subplan profiles - user model changes: no - storage format changes: no - interface changes: no Details: Currently subplan timing information is calculated during close(). This can lead to inaccuracies in the timing, especially when the subplan is near the end of a task pipeline, because other operator's times are calculated after close, during deinitialize(). Correct this by adding the capability in the MetaOperator to invoke a method on each ProfiledPushRuntime SubplanPushRuntime to calculate the timings, so it can be invoked at the proper time as to retrieve the correct time from its downstream operators. Ext-ref: MB-67499 Change-Id: I4d38710626778ef52abb1cd4fe1e0b26e0d74bec Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20069 Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> --- .../src/test/resources/runtimets/profiled.xml | 8 ++++++ .../profile/subplan/subplan.1.ddl.sqlpp | 26 ++++++++++++++++++ .../profile/subplan/subplan.2.update.sqlpp | 32 ++++++++++++++++++++++ .../profile/subplan/subplan.3.plans.sqlpp | 30 ++++++++++++++++++++ .../profile/subplan/subplan.4.plans.sqlpp | 25 +++++++++++++++++ .../profile/subplan/subplan.5.plans.sqlpp | 28 +++++++++++++++++++ .../results/profile/subplan/subplan.3.regex | 1 + .../results/profile/subplan/subplan.4.regex | 1 + .../results/profile/subplan/subplan.5.regex | 1 + .../runtime/base/IProfiledPushRuntime.java | 25 +++++++++++++++++ .../runtime/base/ProfiledPushRuntime.java | 17 +++++++----- .../meta/AlgebricksMetaOperatorDescriptor.java | 14 ++++++++-- .../runtime/operators/meta/PipelineAssembler.java | 9 ++++++ .../operators/meta/SubplanRuntimeFactory.java | 24 ++++++++++++++-- .../api/dataflow/ProfiledOperatorNodePushable.java | 2 +- 15 files changed, 230 insertions(+), 13 deletions(-) diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml index b3f222a557..2efc64f627 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml @@ -58,5 +58,13 @@ <output-dir compare="Text">non-unary-subplan</output-dir> </compilation-unit> </test-case> + <test-case FilePath="profile"> + <compilation-unit name="subplan"> + <parameter name="profile" value="timings" type="string"/> + <parameter name="optimized-logical-plan" value="true" type="string"/> + <parameter name="plan-format" value="json" type="string"/> + <output-dir compare="Text">subplan</output-dir> + </compilation-unit> + </test-case> </test-group> </test-suite> \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.1.ddl.sqlpp new file mode 100644 index 0000000000..c86a631a8f --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.1.ddl.sqlpp @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; +create dataverse test; +use test; + +create dataset test1 primary key(id: uuid) autogenerated; +create dataset test2 primary key(id: uuid) autogenerated; +create dataset test3 primary key(id: uuid) autogenerated; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.2.update.sqlpp new file mode 100644 index 0000000000..91fd485ae0 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.2.update.sqlpp @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +insert into test1 ( +{"x1": 88, "x2": 99, "states": [1,2,3,4], "elements": [{"s": 1, "m": 2}, {"s": 11, "m": 22}] }, +{"x1": 88, "x2": 99, "states": [5,2,3,4], "elements": [{"s": 1, "m": 2}, {"s": 11, "m": 22}] } +); +insert into test2 ( +{"elements": [{"a": 1, "states": [1,2]}, {"a": 2, "states": [4,5]}, {"a": 3, "states": [9,10]}] } +); +insert into test3 ( +{"elements": [{"elements1": [1,2,3]}, {"elements1": [1,2,3]}]}, +{"elements": [{"elements1": [0,2,5]}, {"elements1": [11,22,1]}]} +); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.3.plans.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.3.plans.sqlpp new file mode 100644 index 0000000000..ae2b41fc3f --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.3.plans.sqlpp @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +-- compareunorderedarray=true +use test; + +SELECT f1, +COUNT(DISTINCT test11) AS f2, +COUNT(DISTINCT test12) as f3, +COUNT((SELECT m FROM el)) AS f4 +FROM test1 +LET f1 = (CASE WHEN (ANY a IN test1.states SATISFIES a = 1) THEN "1" WHEN (ANY a IN test1.states SATISFIES a = sleep(2,100)) THEN "2" ELSE "3" END), +el = (SELECT s,m FROM test1.elements) +GROUP BY f1; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.4.plans.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.4.plans.sqlpp new file mode 100644 index 0000000000..86c2d3b668 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.4.plans.sqlpp @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +-- compareunorderedarray=true +use test; + +SELECT states +FROM test2 +LET states = (SELECT sleep(el.a,200) FROM test2.elements AS el WHERE ANY i IN el.states SATISFIES i > 9); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.5.plans.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.5.plans.sqlpp new file mode 100644 index 0000000000..85e43184e2 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.5.plans.sqlpp @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +-- compareunorderedarray=true +use test; + +SELECT CASE WHEN (ANY i IN elements3 SATISFIES i > sleep(1,100)) THEN "a" ELSE "" END AS f +FROM test3 +LET +elements0 = test3.elements, +elements2 = (FROM elements0 AS o SELECT CASE WHEN (ANY i IN o.elements1 SATISFIES i > 2) THEN "b" ELSE "" END AS a), +elements3 = (FROM elements2 SELECT a); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.3.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.3.regex new file mode 100644 index 0000000000..96bffa9f22 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.3.regex @@ -0,0 +1 @@ +/"operator"\s*:\s*"select",\s*(?:\"[^\"]+\"\s*:\s*.+?,\s*){3}"min-time"\s*:\s*8[0-9]{2}(?:\.\d+)?/m \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.4.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.4.regex new file mode 100644 index 0000000000..0f6580ec2d --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.4.regex @@ -0,0 +1 @@ +/"operator"\s*:\s*"assign",\s*(?:\"[^\"]+\"\s*:\s*.+?,\s*){3}"min-time"\s*:\s*2[0-9]{2}(?:\.\d+)?/m \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.5.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.5.regex new file mode 100644 index 0000000000..3840b32e3a --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.5.regex @@ -0,0 +1 @@ +/"operator"\s*:\s*"select",\s*(?:\"[^\"]+\"\s*:\s*.+?,\s*){3}"min-time"\s*:\s*4[0-9]{2}(?:\.\d+)?/m \ No newline at end of file diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java new file mode 100644 index 0000000000..9875a2c96f --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.runtime.base; + +public interface IProfiledPushRuntime extends IPushRuntime { + + public void computeTimings(); + +} diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java index bf4533fb06..ff9a58b555 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java @@ -28,7 +28,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.profiling.IOperatorStats; -public class ProfiledPushRuntime extends ProfiledFrameWriter implements IPushRuntime { +public class ProfiledPushRuntime extends ProfiledFrameWriter implements IProfiledPushRuntime { private final IPushRuntime wrapped; private final IOperatorStats stats; @@ -45,9 +45,16 @@ public class ProfiledPushRuntime extends ProfiledFrameWriter implements IPushRun this.last = last; } + public IOperatorStats getStats() { + return stats; + } + @Override - public void close() throws HyracksDataException { - super.close(); + public void computeTimings() { + //mainly to push through to subplans + if (wrapped instanceof IProfiledPushRuntime) { + ((IProfiledPushRuntime) wrapped).computeTimings(); + } long ownTime = getTotalTime(); //for micro union all. accumulate the time of each input into the counter. //then, on input 0, subtract the output from the accumulated time. @@ -62,10 +69,6 @@ public class ProfiledPushRuntime extends ProfiledFrameWriter implements IPushRun stats.getTimeCounter().set(ownTime); } - public IOperatorStats getStats() { - return stats; - } - @Override public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { if (writer instanceof ITimedWriter) { diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java index 560f8173b3..232121b262 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java @@ -21,11 +21,13 @@ package org.apache.hyracks.algebricks.runtime.operators.meta; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -35,6 +37,7 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.profiling.IOperatorStats; import org.apache.hyracks.api.job.profiling.NoOpOperatorStats; import org.apache.hyracks.api.job.profiling.OperatorStats; @@ -204,6 +207,7 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper private boolean opened = false; private IOperatorStats parentStats = NoOpOperatorStats.INSTANCE; private Map<IPushRuntimeFactory, IOperatorStats> microOpStats = new HashMap<>(); + private List<IProfiledPushRuntime> profiledPushRuntimes = Collections.emptyList(); public void open() throws HyracksDataException { if (startOfPipeline == null) { @@ -211,9 +215,12 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null; RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider .getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0); - PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, + PipelineAssembler assembler = new PipelineAssembler(pipeline, inputArity, outputArity, pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor); - startOfPipeline = pa.assemblePipeline(writer, ctx, microOpStats); + startOfPipeline = assembler.assemblePipeline(writer, ctx, microOpStats); + if (ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME)) { + profiledPushRuntimes = assembler.getProfiledPushRuntimes(); + } } opened = true; startOfPipeline.open(); @@ -246,6 +253,9 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper @Override public void deinitialize() throws HyracksDataException { super.deinitialize(); + if (ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME)) { + profiledPushRuntimes.forEach(IProfiledPushRuntime::computeTimings); + } } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java index c2e8473efc..85a4b51a07 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java @@ -18,11 +18,15 @@ */ package org.apache.hyracks.algebricks.runtime.operators.meta; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime; +import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime; @@ -129,6 +133,11 @@ public class PipelineAssembler { return runtimeMap.get(runtimeFactory); } + public List<IProfiledPushRuntime> getProfiledPushRuntimes() { + return runtimeMap.values().stream().flatMap(Arrays::stream).filter(f -> f instanceof IProfiledPushRuntime) + .map(f -> (IProfiledPushRuntime) f).collect(Collectors.toList()); + } + //TODO: refactoring is needed public static IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx, Map<IPushRuntimeFactory, IPushRuntime> outRuntimeMap) throws HyracksDataException { diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java index 242c603512..7f9f3a9c3b 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java @@ -20,12 +20,15 @@ package org.apache.hyracks.algebricks.runtime.operators.meta; import java.io.DataOutput; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; @@ -116,20 +119,28 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto return new SubplanPushRuntime(ctx, false); } - public class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { + public class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime + implements IProfiledPushRuntime { protected final IHyracksTaskContext ctx; protected final NestedTupleSourceRuntime[] startOfPipelines; - boolean first; + private final boolean profile; + + private final List<IProfiledPushRuntime> timedMicroOps; - boolean profile; + boolean first; protected SubplanPushRuntime(IHyracksTaskContext ctx, boolean ignoreFailures) throws HyracksDataException { this.ctx = ctx; this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME); this.first = true; + if (profile) { + timedMicroOps = new ArrayList<>(); + } else { + timedMicroOps = Collections.emptyList(); + } IMissingWriter[] missingWriters = new IMissingWriter[missingWriterFactories.length]; for (int i = 0; i < missingWriterFactories.length; i++) { @@ -169,11 +180,18 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor, ignoreFailures); IFrameWriter head = pa.assemblePipeline(outputWriter, ctx, stats); + if (profile) { + timedMicroOps.addAll(pa.getProfiledPushRuntimes()); + } startOfPipelines[i] = (NestedTupleSourceRuntime) head; pipelineAssemblers[i] = pa; } } + public void computeTimings() { + timedMicroOps.forEach(IProfiledPushRuntime::computeTimings); + } + @Override public void open() throws HyracksDataException { // writer opened many times? diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java index cb188c0126..072a684550 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java @@ -55,6 +55,7 @@ public class ProfiledOperatorNodePushable implements IOperatorNodePushable, ISta @Override public void deinitialize() throws HyracksDataException { + ProfiledFrameWriter.timeMethod(op::deinitialize, totalTime); long ownTime = totalTime.get(); for (ITimedWriter i : inputs.values()) { ownTime += i.getTotalTime(); @@ -62,7 +63,6 @@ public class ProfiledOperatorNodePushable implements IOperatorNodePushable, ISta for (ITimedWriter w : outputs.values()) { ownTime -= w.getTotalTime(); } - op.deinitialize(); stats.getTimeCounter().set(ownTime); }
