This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 79af162510b14bfdbc56273ca284ee8b16681533 Author: Ali Alsuliman <[email protected]> AuthorDate: Sat Jul 20 04:38:16 2024 +0300 [ASTERIXDB-3466][COMP] Make operators minimum frames budget configurable - user model changes: no - storage format changes: no - interface changes: yes Details: For operators that require a (minimum) budget like sort and join, make the minimum required frames configurable. Default is 512 KB. Ext-ref: MB-62818 Change-Id: I9c01815bc3b0498486728898a41dde4da271d041 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18506 Tested-by: Ali Alsuliman <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- .../operators/physical/InvertedIndexPOperator.java | 6 +++ .../app/resource/PlanStagesGeneratorTest.java | 4 +- .../api/cluster_state_1/cluster_state_1.1.regexadm | 4 ++ .../cluster_state_1_full.1.regexadm | 4 ++ .../cluster_state_1_less.1.regexadm | 4 ++ .../dataset-resources/dataset-resources.6.regex | 2 +- .../dataset-resources/dataset-resources.7.regex | 2 +- .../runtimets/results/misc/jobs/jobs.2.regex | 2 +- .../runtimets/results/misc/ping/ping.2.regex | 2 +- .../asterix/common/config/CompilerProperties.java | 42 ++++++++++++++- .../common/config/OptimizationConfUtil.java | 10 ++-- .../core/algebra/base/IPhysicalOperator.java | 3 ++ .../physical/AbstractGroupByPOperator.java | 6 +++ .../operators/physical/AbstractJoinPOperator.java | 6 +++ .../physical/AbstractPhysicalOperator.java | 6 +++ .../physical/AbstractStableSortPOperator.java | 6 +++ .../operators/physical/WindowPOperator.java | 6 +++ .../operators/physical/WindowStreamPOperator.java | 6 +++ .../rewriter/base/PhysicalOptimizationConfig.java | 62 ++++++++++++++++++++++ .../rewriter/rules/SetMemoryRequirementsRule.java | 27 ++++++---- .../apache/hyracks/api/exceptions/ErrorCode.java | 1 + .../src/main/resources/errormsg/en.properties | 1 + .../dataflow/std/sort/AbstractFrameSorter.java | 6 +-- 23 files changed, 195 insertions(+), 23 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java index 2d5b57d1f4..d83f1ea01d 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java @@ -55,6 +55,7 @@ import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; @@ -99,6 +100,11 @@ public class InvertedIndexPOperator extends IndexSearchPOperator { localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_TEXT_SEARCH); } + @Override + public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) { + localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_TEXT_SEARCH); + } + @Override public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java index c09cb34062..574dc8a848 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java @@ -67,6 +67,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.Preclustere import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator; import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.api.job.resource.IClusterCapacity; import org.junit.Assert; import org.junit.Test; @@ -78,6 +79,7 @@ public class PlanStagesGeneratorTest { private static final int FRAME_SIZE = 32768; private static final int PARALLELISM = 10; private static final long MAX_BUFFER_PER_CONNECTION = 1L; + private static final PhysicalOptimizationConfig physicalConfig = new PhysicalOptimizationConfig(); @Test public void noBlockingPlan() throws AlgebricksException { @@ -336,7 +338,7 @@ public class PlanStagesGeneratorTest { private void assertRequiredMemory(List<PlanStage> stages, long expectedMemory) { for (PlanStage stage : stages) { for (ILogicalOperator op : stage.getOperators()) { - ((AbstractLogicalOperator) op).getPhysicalOperator().createLocalMemoryRequirements(op); + ((AbstractLogicalOperator) op).getPhysicalOperator().createLocalMemoryRequirements(op, physicalConfig); } } final IClusterCapacity clusterCapacity = diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index ad6bfe4ee3..a1736da36f 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -41,7 +41,11 @@ "compiler\.indexonly" : true, "compiler\.internal\.sanitycheck" : true, "compiler\.joinmemory" : 262144, + "compiler.min.groupmemory" : 524288, + "compiler.min.joinmemory" : 524288, "compiler\.min\.memory\.allocation" : true, + "compiler.min.sortmemory" : 524288, + "compiler.min.windowmemory" : 524288, "compiler\.parallelism" : 0, "compiler.queryplanshape" : "zigzag", "compiler.runtime.memory.overhead" : 5, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index 434c135e7d..6f3bc38b34 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -41,7 +41,11 @@ "compiler\.indexonly" : true, "compiler\.internal\.sanitycheck" : false, "compiler\.joinmemory" : 262144, + "compiler.min.groupmemory" : 524288, + "compiler.min.joinmemory" : 524288, "compiler\.min\.memory\.allocation" : true, + "compiler.min.sortmemory" : 524288, + "compiler.min.windowmemory" : 524288, "compiler\.parallelism" : -1, "compiler.queryplanshape" : "zigzag", "compiler.runtime.memory.overhead" : 5, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index 296ac47c36..415bdba663 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -41,7 +41,11 @@ "compiler\.indexonly" : true, "compiler\.internal\.sanitycheck" : false, "compiler\.joinmemory" : 262144, + "compiler.min.groupmemory" : 524288, + "compiler.min.joinmemory" : 524288, "compiler\.min\.memory\.allocation" : true, + "compiler.min.sortmemory" : 524288, + "compiler.min.windowmemory" : 524288, "compiler\.parallelism" : 3, "compiler.queryplanshape" : "zigzag", "compiler.runtime.memory.overhead" : 5, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.6.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.6.regex index 13bbeceba7..7996c332c5 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.6.regex +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.6.regex @@ -1 +1 @@ -/memory\D+240844/ \ No newline at end of file +/memory\D+688128/ \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.7.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.7.regex index 13bbeceba7..7996c332c5 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.7.regex +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.7.regex @@ -1 +1 @@ -/memory\D+240844/ \ No newline at end of file +/memory\D+688128/ \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.2.regex index 13bbeceba7..7996c332c5 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.2.regex +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.2.regex @@ -1 +1 @@ -/memory\D+240844/ \ No newline at end of file +/memory\D+688128/ \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/ping/ping.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/ping/ping.2.regex index 13bbeceba7..7996c332c5 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/ping/ping.2.regex +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/ping/ping.2.regex @@ -1 +1 @@ -/memory\D+240844/ \ No newline at end of file +/memory\D+688128/ \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java index cfe7ce8d5c..8ca5c94ba0 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java @@ -18,6 +18,10 @@ */ package org.apache.asterix.common.config; +import static org.apache.asterix.common.config.OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_GROUP_BY; +import static org.apache.asterix.common.config.OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_JOIN; +import static org.apache.asterix.common.config.OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_SORT; +import static org.apache.asterix.common.config.OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_WINDOW; import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN; import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT; @@ -66,6 +70,22 @@ public class CompilerProperties extends AbstractProperties { INTEGER_BYTE_UNIT, StorageUtil.getIntSizeInBytes(32, KILOBYTE), "The page size (in bytes) for computation"), + COMPILER_MIN_SORTMEMORY( + LONG_BYTE_UNIT, + StorageUtil.getLongSizeInBytes(512, KILOBYTE), + "The min memory budget (in bytes) for a sort operator instance in a partition"), + COMPILER_MIN_JOINMEMORY( + LONG_BYTE_UNIT, + StorageUtil.getLongSizeInBytes(512, KILOBYTE), + "The min memory budget (in bytes) for a join operator instance in a partition"), + COMPILER_MIN_GROUPMEMORY( + LONG_BYTE_UNIT, + StorageUtil.getLongSizeInBytes(512, KILOBYTE), + "The min memory budget (in bytes) for a group by operator instance in a partition"), + COMPILER_MIN_WINDOWMEMORY( + LONG_BYTE_UNIT, + StorageUtil.getLongSizeInBytes(512, KILOBYTE), + "The min memory budget (in bytes) for a window operator instance in a partition"), COMPILER_PARALLELISM( INTEGER, COMPILER_PARALLELISM_AS_STORAGE, @@ -240,6 +260,26 @@ public class CompilerProperties extends AbstractProperties { return accessor.getLong(Option.COMPILER_TEXTSEARCHMEMORY); } + public int getMinSortMemoryFrames() { + int numFrames = (int) accessor.getLong(Option.COMPILER_MIN_SORTMEMORY) / getFrameSize(); + return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_SORT); + } + + public int getMinJoinMemoryFrames() { + int numFrames = (int) accessor.getLong(Option.COMPILER_MIN_JOINMEMORY) / getFrameSize(); + return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_JOIN); + } + + public int getMinGroupMemoryFrames() { + int numFrames = (int) accessor.getLong(Option.COMPILER_MIN_GROUPMEMORY) / getFrameSize(); + return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_GROUP_BY); + } + + public int getMinWindowMemoryFrames() { + int numFrames = (int) accessor.getLong(Option.COMPILER_MIN_WINDOWMEMORY) / getFrameSize(); + return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_WINDOW); + } + public int getFrameSize() { return accessor.getInt(Option.COMPILER_FRAMESIZE); } @@ -315,7 +355,7 @@ public class CompilerProperties extends AbstractProperties { public int getSortMemoryFrames() { int numFrames = (int) getSortMemorySize() / getFrameSize(); - return Math.max(numFrames, OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_SORT); + return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_SORT); } public boolean isColumnFilter() { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java index c83b86ee77..160c04dde9 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java @@ -40,9 +40,9 @@ import org.apache.hyracks.control.common.config.OptionTypes; public class OptimizationConfUtil { public static final int MIN_FRAME_LIMIT_FOR_SORT = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT; - private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY; - private static final int MIN_FRAME_LIMIT_FOR_JOIN = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN; - private static final int MIN_FRAME_LIMIT_FOR_WINDOW = WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW; + public static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY; + public static final int MIN_FRAME_LIMIT_FOR_JOIN = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN; + public static final int MIN_FRAME_LIMIT_FOR_WINDOW = WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW; public static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = 5; // see InvertedIndexPOperator private OptimizationConfUtil() { @@ -119,6 +119,10 @@ public class OptimizationConfUtil { physOptConf.setForceJoinOrderMode(forceJoinOrder); physOptConf.setQueryPlanShapeMode(queryPlanShape); physOptConf.setColumnFilter(columnFilter); + physOptConf.setMinSortFrames(compilerProperties.getMinSortMemoryFrames()); + physOptConf.setMinJoinFrames(compilerProperties.getMinJoinMemoryFrames()); + physOptConf.setMinGroupFrames(compilerProperties.getMinGroupMemoryFrames()); + physOptConf.setMinWindowFrames(compilerProperties.getMinWindowMemoryFrames()); // We should have already validated the parameter names at this point... Set<String> filteredParameterNames = new HashSet<>(parameterNames); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java index b0624e8be3..d1b1815328 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java @@ -25,6 +25,7 @@ import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalProperties import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements; import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; public interface IPhysicalOperator { @@ -56,6 +57,8 @@ public interface IPhysicalOperator { public void createLocalMemoryRequirements(ILogicalOperator op); + public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig); + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException; diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java index 99d87c3016..bb8c488736 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java @@ -32,6 +32,7 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceE import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.api.exceptions.ErrorCode; public abstract class AbstractGroupByPOperator extends AbstractPhysicalOperator { @@ -85,6 +86,11 @@ public abstract class AbstractGroupByPOperator extends AbstractPhysicalOperator localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_GROUP_BY); } + @Override + public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) { + localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(physicalOpConfig.getMinGroupFrames()); + } + @Override public String toString() { return getOperatorTag().toString() + columnList; diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java index c300392c6e..bf2b6124f6 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java @@ -22,6 +22,7 @@ import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind; import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; public abstract class AbstractJoinPOperator extends AbstractPhysicalOperator { @@ -65,4 +66,9 @@ public abstract class AbstractJoinPOperator extends AbstractPhysicalOperator { public void createLocalMemoryRequirements(ILogicalOperator op) { localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_JOIN); } + + @Override + public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) { + localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(physicalOpConfig.getMinJoinFrames()); + } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java index 4bc750244c..298ad38c7c 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java @@ -42,6 +42,7 @@ import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirement import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.PlanCompiler; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; @@ -104,6 +105,11 @@ public abstract class AbstractPhysicalOperator implements IPhysicalOperator { localMemoryRequirements = LocalMemoryRequirements.fixedMemoryBudget(1); } + @Override + public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) { + localMemoryRequirements = LocalMemoryRequirements.fixedMemoryBudget(1); + } + @Override public void disableJobGenBelowMe() { this.disableJobGenBelow = true; diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java index f941bdbb63..ff453222a6 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java @@ -47,6 +47,7 @@ import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty; import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap; public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperator { @@ -187,4 +188,9 @@ public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperat public void createLocalMemoryRequirements(ILogicalOperator op) { localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_SORT); } + + @Override + public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) { + localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(physicalOpConfig.getMinSortFrames()); + } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java index 0e9191fdc2..9e25d85866 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java @@ -28,6 +28,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperat import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements; import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory; @@ -68,6 +69,11 @@ public final class WindowPOperator extends AbstractWindowPOperator { localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_WINDOW); } + @Override + public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) { + localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(physicalOpConfig.getMinWindowFrames()); + } + @Override protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList, IBinaryComparatorFactory[] partitionComparatorFactories, diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java index 78983b988a..ead5f569f7 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java @@ -28,6 +28,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperat import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements; import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory; @@ -54,6 +55,11 @@ public final class WindowStreamPOperator extends AbstractWindowPOperator { localMemoryRequirements = LocalMemoryRequirements.fixedMemoryBudget(MEM_SIZE_IN_FRAMES_FOR_WINDOW_STREAM); } + @Override + public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) { + localMemoryRequirements = LocalMemoryRequirements.fixedMemoryBudget(MEM_SIZE_IN_FRAMES_FOR_WINDOW_STREAM); + } + @Override protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList, IBinaryComparatorFactory[] partitionComparatorFactories, diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java index d16715388b..11171a1094 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java @@ -18,6 +18,11 @@ */ package org.apache.hyracks.algebricks.core.rewriter.base; +import static org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY; +import static org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN; +import static org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT; +import static org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW; + import java.util.Properties; import org.apache.hyracks.algebricks.core.config.AlgebricksConfig; @@ -53,6 +58,10 @@ public class PhysicalOptimizationConfig { private static final String FORCE_JOIN_ORDER = "FORCE_JOIN_ORDER"; private static final String QUERY_PLAN_SHAPE = "QUERY_PLAN_SHAPE"; private static final String COLUMN_FILTER = "COLUMN_FILTER"; + private static final String MIN_SORT_FRAMES = "MIN_SORT_FRAMES"; + private static final String MIN_JOIN_FRAMES = "MIN_JOIN_FRAMES"; + private static final String MIN_GROUP_FRAMES = "MIN_GROUP_FRAMES"; + private static final String MIN_WINDOW_FRAMES = "MIN_WINDOW_FRAMES"; private final Properties properties = new Properties(); @@ -66,6 +75,11 @@ public class PhysicalOptimizationConfig { setInt(DEFAULT_HASH_GROUP_TABLE_SIZE, 10485767); setInt(DEFAULT_EXTERNAL_GROUP_TABLE_SIZE, 10485767); setInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, 10485767); + + setInt(MIN_SORT_FRAMES, MIN_FRAME_LIMIT_FOR_SORT); + setInt(MIN_JOIN_FRAMES, MIN_FRAME_LIMIT_FOR_JOIN); + setInt(MIN_GROUP_FRAMES, MIN_FRAME_LIMIT_FOR_GROUP_BY); + setInt(MIN_WINDOW_FRAMES, MIN_FRAME_LIMIT_FOR_WINDOW); } public int getFrameSize() { @@ -169,6 +183,54 @@ public class PhysicalOptimizationConfig { setInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, tableSize); } + public int getMinSortFrames() { + return getInt(MIN_SORT_FRAMES, MIN_FRAME_LIMIT_FOR_SORT); + } + + public void setMinSortFrames(int minSortFrames) { + if (minSortFrames < MIN_FRAME_LIMIT_FOR_SORT) { + throw new IllegalArgumentException( + "Minimum sort frames is " + MIN_FRAME_LIMIT_FOR_SORT + ", got " + minSortFrames); + } + setInt(MIN_SORT_FRAMES, minSortFrames); + } + + public int getMinJoinFrames() { + return getInt(MIN_JOIN_FRAMES, MIN_FRAME_LIMIT_FOR_JOIN); + } + + public void setMinJoinFrames(int minJoinFrames) { + if (minJoinFrames < MIN_FRAME_LIMIT_FOR_JOIN) { + throw new IllegalArgumentException( + "Minimum join frames is " + MIN_FRAME_LIMIT_FOR_JOIN + ", got " + minJoinFrames); + } + setInt(MIN_JOIN_FRAMES, minJoinFrames); + } + + public int getMinGroupFrames() { + return getInt(MIN_GROUP_FRAMES, MIN_FRAME_LIMIT_FOR_GROUP_BY); + } + + public void setMinGroupFrames(int minGroupFrames) { + if (minGroupFrames < MIN_FRAME_LIMIT_FOR_GROUP_BY) { + throw new IllegalArgumentException( + "Minimum group frames is " + MIN_FRAME_LIMIT_FOR_GROUP_BY + ", got " + minGroupFrames); + } + setInt(MIN_GROUP_FRAMES, minGroupFrames); + } + + public int getMinWindowFrames() { + return getInt(MIN_WINDOW_FRAMES, MIN_FRAME_LIMIT_FOR_WINDOW); + } + + public void setMinWindowFrames(int minWindowFrames) { + if (minWindowFrames < MIN_FRAME_LIMIT_FOR_WINDOW) { + throw new IllegalArgumentException( + "Minimum window frames is " + MIN_FRAME_LIMIT_FOR_WINDOW + ", got " + minWindowFrames); + } + setInt(MIN_WINDOW_FRAMES, minWindowFrames); + } + public boolean getSortParallel() { return getBoolean(SORT_PARALLEL, AlgebricksConfig.SORT_PARALLEL_DEFAULT); } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java index 4e859e4430..0b87eae830 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java @@ -75,7 +75,7 @@ import org.apache.hyracks.api.exceptions.ErrorCode; /** * Set memory requirements for all operators as follows: * <ol> - * <li>First call {@link IPhysicalOperator#createLocalMemoryRequirements(ILogicalOperator)} + * <li>First call {@link IPhysicalOperator#createLocalMemoryRequirements(ILogicalOperator, PhysicalOptimizationConfig)} * to initialize each operator's {@link LocalMemoryRequirements} with minimal memory budget required by * that operator</li> * <li>Then increase memory requirements for certain operators as specified by {@link PhysicalOptimizationConfig}</li> @@ -97,19 +97,23 @@ public class SetMemoryRequirementsRule implements IAlgebraicRewriteRule { if (physOp.getLocalMemoryRequirements() != null) { return false; } - computeLocalMemoryRequirements(op, createMemoryRequirementsConfigurator(context)); + computeLocalMemoryRequirements(op, createMemoryRequirementsConfigurator(context), context); return true; } private void computeLocalMemoryRequirements(AbstractLogicalOperator op, - ILogicalOperatorVisitor<Void, Void> memoryRequirementsVisitor) throws AlgebricksException { + ILogicalOperatorVisitor<Void, Void> memoryRequirementsVisitor, IOptimizationContext context) + throws AlgebricksException { IPhysicalOperator physOp = op.getPhysicalOperator(); if (physOp.getLocalMemoryRequirements() == null) { - physOp.createLocalMemoryRequirements(op); - if (physOp.getLocalMemoryRequirements() == null) { - throw new IllegalStateException(physOp.getOperatorTag().toString()); - } - if (memoryRequirementsVisitor != null) { + if (memoryRequirementsVisitor == null) { + // null means forcing the min memory budget from the physical optimization config + physOp.createLocalMemoryRequirements(op, context.getPhysicalOptimizationConfig()); + } else { + physOp.createLocalMemoryRequirements(op); + if (physOp.getLocalMemoryRequirements() == null) { + throw new IllegalStateException(physOp.getOperatorTag().toString()); + } op.accept(memoryRequirementsVisitor, null); } } @@ -117,13 +121,14 @@ public class SetMemoryRequirementsRule implements IAlgebraicRewriteRule { AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op; for (ILogicalPlan p : nested.getNestedPlans()) { for (Mutable<ILogicalOperator> root : p.getRoots()) { - computeLocalMemoryRequirements((AbstractLogicalOperator) root.getValue(), - memoryRequirementsVisitor); + computeLocalMemoryRequirements((AbstractLogicalOperator) root.getValue(), memoryRequirementsVisitor, + context); } } } for (Mutable<ILogicalOperator> opRef : op.getInputs()) { - computeLocalMemoryRequirements((AbstractLogicalOperator) opRef.getValue(), memoryRequirementsVisitor); + computeLocalMemoryRequirements((AbstractLogicalOperator) opRef.getValue(), memoryRequirementsVisitor, + context); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 766fb26e57..59a4da43f3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -157,6 +157,7 @@ public enum ErrorCode implements IError { INVALID_STRING_UNICODE(127), UNSUPPORTED_WRITE_SPEC(128), JOB_REJECTED(129), + FRAME_BIGGER_THAN_SORT_MEMORY(130), // Compilation error codes. RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000), diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index fa52bc605e..7da6bbd040 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -147,6 +147,7 @@ 127 = Decoding error - %1$s 128 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s 129 = Job %1$s not run. Cluster is not accepting jobs +130 = Frame data=%1$s (requiring %2$s) is bigger than the sort budget. Used=%3$s, max=%4$s. Please increase the sort memory budget. 10000 = The given rule collection %1$s is not an instance of the List class. 10001 = Cannot compose partition constraint %1$s with %2$s diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java index 20924d5544..d4dfae16dc 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java @@ -31,6 +31,7 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; @@ -150,9 +151,8 @@ public abstract class AbstractFrameSorter implements IFrameSorter { return true; } if (getFrameCount() == 0) { - throw new HyracksDataException("The required memory=" + requiredMemory + " for the frame data=" - + inputTupleAccessor.getBuffer().capacity() + " is too big for the sorting buffer. Used=" - + totalMemoryUsed + ", max=" + maxSortMemory + ", please allocate bigger buffer size"); + throw HyracksDataException.create(ErrorCode.FRAME_BIGGER_THAN_SORT_MEMORY, + inputTupleAccessor.getBuffer().capacity(), requiredMemory, totalMemoryUsed, maxSortMemory); } return false; }
