This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 7285518ca0d7492c7393ed906cb2fe041aaf714a
Author: Ian Maxon <[email protected]>
AuthorDate: Thu Jul 10 16:17:38 2025 -0700

    [ASTERIXDB-3628][RT] Correct subplan profiles
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    Currently subplan timing information is calculated during close().
    This can lead to inaccuracies in the timing, especially when the
    subplan is near the end of a task pipeline, because other
    operator's times are calculated after close, during deinitialize().
    
    Correct this by adding the capability in the MetaOperator to invoke a
    method on each ProfiledPushRuntime SubplanPushRuntime to calculate the
    timings, so it can be invoked at the proper time as to retrieve the
    correct time from its downstream operators.
    
    Ext-ref: MB-67499
    
    Change-Id: I4d38710626778ef52abb1cd4fe1e0b26e0d74bec
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20069
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
---
 .../src/test/resources/runtimets/profiled.xml      |  8 ++++++
 .../profile/subplan/subplan.1.ddl.sqlpp            | 26 ++++++++++++++++++
 .../profile/subplan/subplan.2.update.sqlpp         | 32 ++++++++++++++++++++++
 .../profile/subplan/subplan.3.plans.sqlpp          | 30 ++++++++++++++++++++
 .../profile/subplan/subplan.4.plans.sqlpp          | 25 +++++++++++++++++
 .../profile/subplan/subplan.5.plans.sqlpp          | 28 +++++++++++++++++++
 .../results/profile/subplan/subplan.3.regex        |  1 +
 .../results/profile/subplan/subplan.4.regex        |  1 +
 .../results/profile/subplan/subplan.5.regex        |  1 +
 .../runtime/base/IProfiledPushRuntime.java         | 25 +++++++++++++++++
 .../runtime/base/ProfiledPushRuntime.java          | 17 +++++++-----
 .../meta/AlgebricksMetaOperatorDescriptor.java     | 14 ++++++++--
 .../runtime/operators/meta/PipelineAssembler.java  |  9 ++++++
 .../operators/meta/SubplanRuntimeFactory.java      | 24 ++++++++++++++--
 .../api/dataflow/ProfiledOperatorNodePushable.java |  2 +-
 15 files changed, 230 insertions(+), 13 deletions(-)

diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
index b3f222a557..2efc64f627 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
@@ -58,5 +58,13 @@
         <output-dir compare="Text">non-unary-subplan</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="profile">
+      <compilation-unit name="subplan">
+        <parameter name="profile" value="timings" type="string"/>
+        <parameter name="optimized-logical-plan" value="true" type="string"/>
+        <parameter name="plan-format" value="json" type="string"/>
+        <output-dir compare="Text">subplan</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.1.ddl.sqlpp
new file mode 100644
index 0000000000..c86a631a8f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.1.ddl.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create dataset test1 primary key(id: uuid) autogenerated;
+create dataset test2 primary key(id: uuid) autogenerated;
+create dataset test3 primary key(id: uuid) autogenerated;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.2.update.sqlpp
new file mode 100644
index 0000000000..91fd485ae0
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.2.update.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into test1 (
+{"x1": 88, "x2": 99, "states": [1,2,3,4], "elements": [{"s": 1, "m": 2}, {"s": 
11, "m": 22}] },
+{"x1": 88, "x2": 99, "states": [5,2,3,4], "elements": [{"s": 1, "m": 2}, {"s": 
11, "m": 22}] }
+);
+insert into test2 (
+{"elements": [{"a": 1, "states": [1,2]}, {"a": 2, "states": [4,5]}, {"a": 3, 
"states": [9,10]}] }
+);
+insert into test3 (
+{"elements": [{"elements1": [1,2,3]}, {"elements1": [1,2,3]}]},
+{"elements": [{"elements1": [0,2,5]}, {"elements1": [11,22,1]}]}
+);
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.3.plans.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.3.plans.sqlpp
new file mode 100644
index 0000000000..ae2b41fc3f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.3.plans.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- compareunorderedarray=true
+use test;
+
+SELECT f1,
+COUNT(DISTINCT test11) AS f2,
+COUNT(DISTINCT test12) as f3,
+COUNT((SELECT m FROM el)) AS f4
+FROM test1
+LET f1 = (CASE WHEN (ANY a IN test1.states SATISFIES a = 1) THEN "1" WHEN (ANY 
a IN test1.states SATISFIES a = sleep(2,100)) THEN "2" ELSE "3" END),
+el = (SELECT s,m FROM test1.elements)
+GROUP BY f1;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.4.plans.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.4.plans.sqlpp
new file mode 100644
index 0000000000..86c2d3b668
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.4.plans.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- compareunorderedarray=true
+use test;
+
+SELECT states
+FROM test2
+LET states = (SELECT sleep(el.a,200) FROM test2.elements AS el WHERE ANY i IN 
el.states SATISFIES i > 9);
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.5.plans.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.5.plans.sqlpp
new file mode 100644
index 0000000000..85e43184e2
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.5.plans.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- compareunorderedarray=true
+use test;
+
+SELECT CASE WHEN (ANY i IN elements3 SATISFIES i > sleep(1,100)) THEN "a" ELSE 
"" END AS f
+FROM test3
+LET
+elements0 = test3.elements,
+elements2 = (FROM elements0 AS o SELECT CASE WHEN (ANY i IN o.elements1 
SATISFIES i > 2) THEN "b" ELSE "" END AS a),
+elements3 = (FROM elements2 SELECT a);
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.3.regex
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.3.regex
new file mode 100644
index 0000000000..96bffa9f22
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.3.regex
@@ -0,0 +1 @@
+/"operator"\s*:\s*"select",\s*(?:\"[^\"]+\"\s*:\s*.+?,\s*){3}"min-time"\s*:\s*8[0-9]{2}(?:\.\d+)?/m
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.4.regex
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.4.regex
new file mode 100644
index 0000000000..0f6580ec2d
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.4.regex
@@ -0,0 +1 @@
+/"operator"\s*:\s*"assign",\s*(?:\"[^\"]+\"\s*:\s*.+?,\s*){3}"min-time"\s*:\s*2[0-9]{2}(?:\.\d+)?/m
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.5.regex
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.5.regex
new file mode 100644
index 0000000000..3840b32e3a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.5.regex
@@ -0,0 +1 @@
+/"operator"\s*:\s*"select",\s*(?:\"[^\"]+\"\s*:\s*.+?,\s*){3}"min-time"\s*:\s*4[0-9]{2}(?:\.\d+)?/m
\ No newline at end of file
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java
new file mode 100644
index 0000000000..9875a2c96f
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.base;
+
+public interface IProfiledPushRuntime extends IPushRuntime {
+
+    public void computeTimings();
+
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
index bf4533fb06..ff9a58b555 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
@@ -28,7 +28,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 
-public class ProfiledPushRuntime extends ProfiledFrameWriter implements 
IPushRuntime {
+public class ProfiledPushRuntime extends ProfiledFrameWriter implements 
IProfiledPushRuntime {
 
     private final IPushRuntime wrapped;
     private final IOperatorStats stats;
@@ -45,9 +45,16 @@ public class ProfiledPushRuntime extends ProfiledFrameWriter 
implements IPushRun
         this.last = last;
     }
 
+    public IOperatorStats getStats() {
+        return stats;
+    }
+
     @Override
-    public void close() throws HyracksDataException {
-        super.close();
+    public void computeTimings() {
+        //mainly to push through to subplans
+        if (wrapped instanceof IProfiledPushRuntime) {
+            ((IProfiledPushRuntime) wrapped).computeTimings();
+        }
         long ownTime = getTotalTime();
         //for micro union all. accumulate the time of each input into the 
counter.
         //then, on input 0, subtract the output from the accumulated time.
@@ -62,10 +69,6 @@ public class ProfiledPushRuntime extends ProfiledFrameWriter 
implements IPushRun
         stats.getTimeCounter().set(ownTime);
     }
 
-    public IOperatorStats getStats() {
-        return stats;
-    }
-
     @Override
     public void setOutputFrameWriter(int index, IFrameWriter writer, 
RecordDescriptor recordDesc) {
         if (writer instanceof ITimedWriter) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 560f8173b3..232121b262 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -21,11 +21,13 @@ package 
org.apache.hyracks.algebricks.runtime.operators.meta;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -35,6 +37,7 @@ import 
org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
 import org.apache.hyracks.api.job.profiling.OperatorStats;
@@ -204,6 +207,7 @@ public class AlgebricksMetaOperatorDescriptor extends 
AbstractSingleActivityOper
             private boolean opened = false;
             private IOperatorStats parentStats = NoOpOperatorStats.INSTANCE;
             private Map<IPushRuntimeFactory, IOperatorStats> microOpStats = 
new HashMap<>();
+            private List<IProfiledPushRuntime> profiledPushRuntimes = 
Collections.emptyList();
 
             public void open() throws HyracksDataException {
                 if (startOfPipeline == null) {
@@ -211,9 +215,12 @@ public class AlgebricksMetaOperatorDescriptor extends 
AbstractSingleActivityOper
                             outputArity > 0 ? 
AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null;
                     RecordDescriptor pipelineInputRecordDescriptor = 
recordDescProvider
                             
.getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(),
 0);
-                    PipelineAssembler pa = new PipelineAssembler(pipeline, 
inputArity, outputArity,
+                    PipelineAssembler assembler = new 
PipelineAssembler(pipeline, inputArity, outputArity,
                             pipelineInputRecordDescriptor, 
pipelineOutputRecordDescriptor);
-                    startOfPipeline = pa.assemblePipeline(writer, ctx, 
microOpStats);
+                    startOfPipeline = assembler.assemblePipeline(writer, ctx, 
microOpStats);
+                    if (ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME)) {
+                        profiledPushRuntimes = 
assembler.getProfiledPushRuntimes();
+                    }
                 }
                 opened = true;
                 startOfPipeline.open();
@@ -246,6 +253,9 @@ public class AlgebricksMetaOperatorDescriptor extends 
AbstractSingleActivityOper
             @Override
             public void deinitialize() throws HyracksDataException {
                 super.deinitialize();
+                if (ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME)) {
+                    
profiledPushRuntimes.forEach(IProfiledPushRuntime::computeTimings);
+                }
             }
 
             @Override
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index c2e8473efc..85a4b51a07 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -18,11 +18,15 @@
  */
 package org.apache.hyracks.algebricks.runtime.operators.meta;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime;
@@ -129,6 +133,11 @@ public class PipelineAssembler {
         return runtimeMap.get(runtimeFactory);
     }
 
+    public List<IProfiledPushRuntime> getProfiledPushRuntimes() {
+        return runtimeMap.values().stream().flatMap(Arrays::stream).filter(f 
-> f instanceof IProfiledPushRuntime)
+                .map(f -> (IProfiledPushRuntime) 
f).collect(Collectors.toList());
+    }
+
     //TODO: refactoring is needed
     public static IFrameWriter assemblePipeline(AlgebricksPipeline subplan, 
IFrameWriter writer,
             IHyracksTaskContext ctx, Map<IPushRuntimeFactory, IPushRuntime> 
outRuntimeMap) throws HyracksDataException {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 242c603512..7f9f3a9c3b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -20,12 +20,15 @@ package 
org.apache.hyracks.algebricks.runtime.operators.meta;
 
 import java.io.DataOutput;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -116,20 +119,28 @@ public class SubplanRuntimeFactory extends 
AbstractOneInputOneOutputRuntimeFacto
         return new SubplanPushRuntime(ctx, false);
     }
 
-    public class SubplanPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
+    public class SubplanPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime
+            implements IProfiledPushRuntime {
 
         protected final IHyracksTaskContext ctx;
 
         protected final NestedTupleSourceRuntime[] startOfPipelines;
 
-        boolean first;
+        private final boolean profile;
+
+        private final List<IProfiledPushRuntime> timedMicroOps;
 
-        boolean profile;
+        boolean first;
 
         protected SubplanPushRuntime(IHyracksTaskContext ctx, boolean 
ignoreFailures) throws HyracksDataException {
             this.ctx = ctx;
             this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
             this.first = true;
+            if (profile) {
+                timedMicroOps = new ArrayList<>();
+            } else {
+                timedMicroOps = Collections.emptyList();
+            }
 
             IMissingWriter[] missingWriters = new 
IMissingWriter[missingWriterFactories.length];
             for (int i = 0; i < missingWriterFactories.length; i++) {
@@ -169,11 +180,18 @@ public class SubplanRuntimeFactory extends 
AbstractOneInputOneOutputRuntimeFacto
                 PipelineAssembler pa =
                         new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, 
outputRecordDescriptor, ignoreFailures);
                 IFrameWriter head = pa.assemblePipeline(outputWriter, ctx, 
stats);
+                if (profile) {
+                    timedMicroOps.addAll(pa.getProfiledPushRuntimes());
+                }
                 startOfPipelines[i] = (NestedTupleSourceRuntime) head;
                 pipelineAssemblers[i] = pa;
             }
         }
 
+        public void computeTimings() {
+            timedMicroOps.forEach(IProfiledPushRuntime::computeTimings);
+        }
+
         @Override
         public void open() throws HyracksDataException {
             // writer opened many times?
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
index cb188c0126..072a684550 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
@@ -55,6 +55,7 @@ public class ProfiledOperatorNodePushable implements 
IOperatorNodePushable, ISta
 
     @Override
     public void deinitialize() throws HyracksDataException {
+        ProfiledFrameWriter.timeMethod(op::deinitialize, totalTime);
         long ownTime = totalTime.get();
         for (ITimedWriter i : inputs.values()) {
             ownTime += i.getTotalTime();
@@ -62,7 +63,6 @@ public class ProfiledOperatorNodePushable implements 
IOperatorNodePushable, ISta
         for (ITimedWriter w : outputs.values()) {
             ownTime -= w.getTotalTime();
         }
-        op.deinitialize();
         stats.getTimeCounter().set(ownTime);
     }
 

Reply via email to