This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 6c1e847f1902e1ba710134b0ab15fd7daf76082d Author: Ian Maxon <[email protected]> AuthorDate: Thu Nov 16 20:24:30 2023 -0800 [ASTERIXDB-3313][COMP] Defuse profile when not profiling - user model changes: no - storage format changes: no - interface changes: no Details: The logical2physical map is time consuming to produce, so it should not be computed unless it's needed for a profile. Likewise keeping the last plan in memory is wasteful and useless unless a profile is needed. Lastly in the pipeline assembler, look at the profile flag first instead of the instanceof check to save time. Change-Id: Ia9a84eedbcb33c29f03155a8605bb82af372f1f3 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17962 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ian Maxon <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../java/org/apache/asterix/api/common/APIFramework.java | 11 ++++++++--- .../org/apache/asterix/app/translator/QueryTranslator.java | 6 +++--- .../compiler/api/HeuristicCompilerFactoryBuilder.java | 13 +++++++++++++ .../apache/hyracks/algebricks/compiler/api/ICompiler.java | 6 ++++++ .../hyracks/algebricks/core/jobgen/impl/JobBuilder.java | 12 +++++++++++- .../hyracks/algebricks/core/jobgen/impl/PlanCompiler.java | 9 +++++++++ .../runtime/operators/meta/PipelineAssembler.java | 2 +- 7 files changed, 51 insertions(+), 8 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 3c0b01de72..0cc27903e4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -107,6 +108,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.resource.IClusterCapacity; @@ -204,7 +206,7 @@ public class APIFramework { public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider, Query query, int varCounter, String outputDatasetName, SessionOutput output, ICompiledDmlStatement statement, Map<VarIdentifier, IAObject> externalVars, IResponsePrinter printer, - IWarningCollector warningCollector, IRequestParameters requestParameters) + IWarningCollector warningCollector, IRequestParameters requestParameters, EnumSet<JobFlag> runtimeFlags) throws AlgebricksException, ACIDException { // establish facts @@ -325,7 +327,7 @@ public class APIFramework { JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, metadataProvider.isWriteTransaction()); - JobSpecification spec = compiler.createJob(ccAppContext, jobEventListenerFactory); + JobSpecification spec = compiler.createJob(ccAppContext, jobEventListenerFactory, runtimeFlags); if (isQuery) { if (!compiler.skipJobCapacityAssignment()) { @@ -347,7 +349,10 @@ public class APIFramework { if (isQuery || isLoad) { generateOptimizedLogicalPlan(plan, spec.getLogical2PhysicalMap(), output.config().getPlanFormat(), cboMode); - lastPlan = new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat()); + if (runtimeFlags.contains(JobFlag.PROFILE_RUNTIME)) { + lastPlan = + new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat()); + } } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 8b635a2b22..e602b761d8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -3663,7 +3663,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen loadStmt.getDatasetName(), loadStmt.getAdapter(), properties, loadStmt.dataIsAlreadySorted()); cls.setSourceLocation(stmt.getSourceLocation()); JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls, - null, responsePrinter, warningCollector, null); + null, responsePrinter, warningCollector, null, jobFlags); afterCompile(); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; @@ -3794,7 +3794,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // Query Compilation (happens under the same ongoing metadata transaction) return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, (Query) rewrittenResult.first, rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionOutput, stmt, externalVars, - responsePrinter, warningCollector, requestParameters); + responsePrinter, warningCollector, requestParameters, jobFlags); } protected JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector clusterInfoCollector, @@ -3835,7 +3835,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // transaction) return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, rewrittenInsertUpsert.getQuery(), rewrittenResult.second, datasetName, sessionOutput, clfrqs, externalVars, responsePrinter, - warningCollector, null); + warningCollector, null, jobFlags); } protected void handleCreateFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java index 85910aa1c4..a44e1be35e 100644 --- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java +++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.algebricks.compiler.api; +import java.util.EnumSet; import java.util.List; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; @@ -47,6 +48,7 @@ import org.apache.hyracks.algebricks.data.IAWriterFactory; import org.apache.hyracks.algebricks.runtime.writers.SerializedDataWriterFactory; import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.job.IJobletEventListenerFactory; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobSpecification; public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuilder { @@ -171,6 +173,17 @@ public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuil return pc.compilePlan(plan, jobEventListenerFactory); } + @Override + public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory, + EnumSet<JobFlag> runtimeFlags) throws AlgebricksException { + AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting Job Generation.\n"); + PlanCompiler pc = factory.createPlanCompiler(oc, appContext, writerFactory); + if (runtimeFlags.contains(JobFlag.PROFILE_RUNTIME)) { + pc.enableLog2PhysMapping(); + } + return pc.compilePlan(plan, jobEventListenerFactory); + } + @Override public boolean skipJobCapacityAssignment() { return oc.skipJobCapacityAssignment(); diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java index 0f5798e578..7de0adb0e3 100644 --- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java +++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java @@ -18,8 +18,11 @@ */ package org.apache.hyracks.algebricks.compiler.api; +import java.util.EnumSet; + import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.job.IJobletEventListenerFactory; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobSpecification; public interface ICompiler { @@ -28,5 +31,8 @@ public interface ICompiler { public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException; + JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobletEventListenerFactory, + EnumSet<JobFlag> runtimeFlags) throws AlgebricksException; + boolean skipJobCapacityAssignment(); } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java index 99ef8d00c5..ca26515554 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java @@ -74,6 +74,8 @@ public class JobBuilder implements IHyracksJobBuilder { private int aodCounter = 0; + private boolean genLog2PhysMap = false; + public JobBuilder(JobSpecification jobSpec, AlgebricksAbsolutePartitionConstraint clusterLocations) { this.jobSpec = jobSpec; this.clusterLocations = clusterLocations; @@ -96,6 +98,10 @@ public class JobBuilder implements IHyracksJobBuilder { new String[] { clusterLocations.getLocations()[Math.abs(jobSpec.hashCode() % nPartitions)] }); } + public void enableLog2PhysMapping() { + this.genLog2PhysMap = true; + } + @Override public void contributeMicroOperator(ILogicalOperator op, IPushRuntimeFactory runtime, RecordDescriptor recDesc) { contributeMicroOperator(op, runtime, recDesc, null); @@ -214,7 +220,11 @@ public class JobBuilder implements IHyracksJobBuilder { jobSpec.addRoot(opDesc); } setAllPartitionConstraints(tgtConstraints); - jobSpec.setLogical2PhysicalMap(getLogical2PhysicalMap()); + if (genLog2PhysMap) { + jobSpec.setLogical2PhysicalMap(getLogical2PhysicalMap()); + } else { + jobSpec.setLogical2PhysicalMap(Collections.emptyMap()); + } } public List<IOperatorDescriptor> getGeneratedMetaOps() { diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java index 2ef5c6c8f9..f8bdfa5113 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java @@ -39,6 +39,8 @@ public class PlanCompiler { private JobGenContext context; private Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> operatorVisitedToParents = new HashMap<>(); + boolean genLog2PhysMap = false; + public PlanCompiler(JobGenContext context) { this.context = context; } @@ -47,6 +49,10 @@ public class PlanCompiler { return context; } + public void enableLog2PhysMapping() { + this.genLog2PhysMap = true; + } + public JobSpecification compilePlan(ILogicalPlan plan, IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException { return compilePlanImpl(plan, false, null, jobEventListenerFactory); @@ -66,6 +72,9 @@ public class PlanCompiler { } List<ILogicalOperator> rootOps = new ArrayList<>(); JobBuilder builder = new JobBuilder(spec, context.getClusterLocations()); + if (genLog2PhysMap) { + builder.enableLog2PhysMapping(); + } for (Mutable<ILogicalOperator> opRef : plan.getRoots()) { compileOpRef(opRef, spec, builder, outerPlanSchema); rootOps.add(opRef.getValue()); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java index 0ce2ff523a..202c087617 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java @@ -77,7 +77,7 @@ public class PipelineAssembler { for (int j = 0; j < newRuntimes.length; j++) { //ETS is wrapped externally, and doesn't need the micro-op wrapper since it isn't a pipeline //we also want to avoid any instances of NoOp stats in the pipeline that snuck in somehow - boolean shouldProfile = !(runtimeFactory instanceof EmptyTupleSourceRuntimeFactory) && profile + boolean shouldProfile = profile && !(runtimeFactory instanceof EmptyTupleSourceRuntimeFactory) && microOpStats.containsKey(runtimeFactory); if (shouldProfile) { ProfiledPushRuntime profiled;
