This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e86da5  [NO ISSUE][COMP] Make memory requirements an operator property
1e86da5 is described below

commit 1e86da50fa7cca0f079f87622a414422d2286fe9
Author: Dmitry Lychagin <[email protected]>
AuthorDate: Tue May 7 11:35:45 2019 -0700

    [NO ISSUE][COMP] Make memory requirements an operator property
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Refactor how memory requirements are maintained in a query plan
    - Introduce LocalMemoryRequirements class. Its instances are held
      by each physical operator and could be altered by the optimizer.
    - Introduce optimizer rule SetMemoryRequirementsRule that
      initializes and configures memory requirements for each operator
    
    Change-Id: I3481ddfe163c6ce786290c540cbd05db16a7f64f
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3374
    Contrib: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../operators/physical/InvertedIndexPOperator.java |  18 +-
 .../asterix/optimizer/base/RuleCollections.java    |   9 +-
 .../optimizer/rules/PushLimitIntoOrderByRule.java  |   5 +-
 .../rules/SetAsterixPhysicalOperatorsRule.java     |  13 +-
 .../app/resource/OperatorResourcesComputer.java    | 103 +-----
 .../org/apache/asterix/utils/ResourceUtils.java    |  16 +-
 .../app/resource/PlanStagesGeneratorTest.java      |  85 +++--
 .../common/config/OptimizationConfUtil.java        |  16 +-
 .../core/algebra/base/IPhysicalOperator.java       |  13 +-
 .../physical/AbstractGroupByPOperator.java         |  15 +-
 .../operators/physical/AbstractJoinPOperator.java  |   9 +
 .../physical/AbstractPhysicalOperator.java         |  17 +-
 .../AbstractPreclusteredGroupByPOperator.java      |   4 +-
 .../physical/AbstractStableSortPOperator.java      |  13 +-
 .../physical/ExternalGroupByPOperator.java         |  14 +-
 .../physical/HybridHashJoinPOperator.java          |  27 +-
 .../physical/InMemoryHashJoinPOperator.java        |  12 +-
 .../MicroPreclusteredGroupByPOperator.java         |   5 +-
 .../physical/MicroStableSortPOperator.java         |   4 +-
 .../physical/NestedLoopJoinPOperator.java          |   8 +-
 .../physical/PreclusteredGroupByPOperator.java     |   5 +-
 .../operators/physical/SortGroupByPOperator.java   |   5 +-
 .../operators/physical/StableSortPOperator.java    |   8 +-
 .../operators/physical/WindowPOperator.java        |  20 +-
 .../operators/physical/WindowStreamPOperator.java  |  10 +
 .../properties/LocalMemoryRequirements.java        | 102 ++++++
 .../rules/EnforceStructuralPropertiesRule.java     |  14 +-
 .../rules/HybridToInMemoryHashJoinRule.java        |  54 +++
 .../rewriter/rules/PushGroupByIntoSortRule.java    |   1 -
 .../rules/SetAlgebricksPhysicalOperatorsRule.java  |  35 +-
 .../rewriter/rules/SetMemoryRequirementsRule.java  | 365 +++++++++++++++++++++
 .../algebricks/rewriter/util/JoinUtils.java        |  34 +-
 32 files changed, 792 insertions(+), 267 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 98feafa..491911b 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -18,9 +18,6 @@
  */
 package org.apache.asterix.algebra.operators.physical;
 
-import java.util.Map;
-
-import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.config.OptimizationConfUtil;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.declared.DataSourceId;
@@ -50,6 +47,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -68,6 +66,11 @@ import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokeniz
  * inverted-index search.
  */
 public class InvertedIndexPOperator extends IndexSearchPOperator {
+
+    // variable memory, min 5 frames
+    // 1 for query + 2 for intermediate results + 1 for final result + 1 for 
reading an inverted list
+    public static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = 
OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_TEXT_SEARCH;
+
     private final boolean isPartitioned;
 
     public InvertedIndexPOperator(IDataSourceIndex<String, DataSourceId> idx, 
INodeDomain domain,
@@ -86,6 +89,11 @@ public class InvertedIndexPOperator extends 
IndexSearchPOperator {
     }
 
     @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = 
LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_TEXT_SEARCH);
+    }
+
+    @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
@@ -114,9 +122,7 @@ public class InvertedIndexPOperator extends 
IndexSearchPOperator {
             retainNull = true;
         }
         // In-memory budget (frame limit) for inverted-index search operations
-        CompilerProperties compilerProp = 
metadataProvider.getApplicationContext().getCompilerProperties();
-        Map<String, Object> queryConfig = metadataProvider.getConfig();
-        int frameLimit = 
OptimizationConfUtil.getTextSearchNumFrames(compilerProp, queryConfig, 
op.getSourceLocation());
+        int frameLimit = localMemoryRequirements.getMemoryBudgetInFrames();
 
         // Build runtime.
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
invIndexSearch =
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 6a11abf..531cbf3 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -105,6 +105,7 @@ import 
org.apache.hyracks.algebricks.rewriter.rules.ExtractCommonOperatorsRule;
 import org.apache.hyracks.algebricks.rewriter.rules.ExtractGbyExpressionsRule;
 import 
org.apache.hyracks.algebricks.rewriter.rules.ExtractGroupByDecorVariablesRule;
 import 
org.apache.hyracks.algebricks.rewriter.rules.FactorRedundantGroupAndDecorVarsRule;
+import 
org.apache.hyracks.algebricks.rewriter.rules.HybridToInMemoryHashJoinRule;
 import org.apache.hyracks.algebricks.rewriter.rules.InferTypesRule;
 import 
org.apache.hyracks.algebricks.rewriter.rules.InlineAssignIntoAggregateRule;
 import 
org.apache.hyracks.algebricks.rewriter.rules.InlineSingleReferenceVariablesRule;
@@ -133,6 +134,7 @@ import 
org.apache.hyracks.algebricks.rewriter.rules.RemoveUnnecessarySortMergeEx
 import 
org.apache.hyracks.algebricks.rewriter.rules.RemoveUnusedAssignAndAggregateRule;
 import org.apache.hyracks.algebricks.rewriter.rules.ReuseWindowAggregateRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SetExecutionModeRule;
+import org.apache.hyracks.algebricks.rewriter.rules.SetMemoryRequirementsRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SimpleUnnestToProductRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SwitchInnerJoinBranchRule;
 import 
org.apache.hyracks.algebricks.rewriter.rules.subplan.EliminateIsomorphicSubplanRule;
@@ -359,6 +361,9 @@ public final class RuleCollections {
         //Turned off the following rule for now not to change OptimizerTest 
results.
         physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule());
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
+        physicalRewritesAllLevels.add(new SetMemoryRequirementsRule());
+        // must run after SetMemoryRequirementsRule
+        physicalRewritesAllLevels.add(new HybridToInMemoryHashJoinRule());
         physicalRewritesAllLevels.add(new 
AddEquivalenceClassForRecordConstructorRule());
         physicalRewritesAllLevels.add(new CheckFullParallelSortRule());
         physicalRewritesAllLevels.add(new 
EnforceStructuralPropertiesRule(BuiltinFunctions.RANGE_MAP,
@@ -403,11 +408,11 @@ public final class RuleCollections {
                 .add(new 
IsolateHyracksOperatorsRule(HeuristicOptimizer.hyraxOperatorsBelowWhichJobGenIsDisabled));
         prepareForJobGenRewrites.add(new FixReplicateOperatorOutputsRule());
         prepareForJobGenRewrites.add(new ExtractCommonOperatorsRule());
-        // Re-infer all types, so that, e.g., the effect of not-is-null is
-        // propagated.
+        // Re-infer all types, so that, e.g., the effect of not-is-null is 
propagated
         prepareForJobGenRewrites.add(new ReinferAllTypesRule());
         prepareForJobGenRewrites.add(new PushGroupByIntoSortRule());
         prepareForJobGenRewrites.add(new SetExecutionModeRule());
+        prepareForJobGenRewrites.add(new SetMemoryRequirementsRule());
         prepareForJobGenRewrites.add(new SweepIllegalNonfunctionalFunctions());
         prepareForJobGenRewrites.add(new FixReplicateOperatorOutputsRule());
         return prepareForJobGenRewrites;
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoOrderByRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoOrderByRule.java
index e64889d..51e536a 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoOrderByRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoOrderByRule.java
@@ -35,7 +35,6 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 
 /**
  * If an ORDER operator is followed by LIMIT, then we can push LIMIT into 
ORDER operator.
@@ -89,7 +88,6 @@ public class PushLimitIntoOrderByRule implements 
IAlgebraicRewriteRule {
      */
     private boolean pushLimitIntoOrder(Mutable<ILogicalOperator> opRef, 
Mutable<ILogicalOperator> opRef2,
             IOptimizationContext context) throws AlgebricksException {
-        PhysicalOptimizationConfig physicalOptimizationConfig = 
context.getPhysicalOptimizationConfig();
         LimitOperator limitOp = (LimitOperator) opRef.getValue();
         OrderOperator orderOp = (OrderOperator) opRef2.getValue();
 
@@ -106,8 +104,7 @@ public class PushLimitIntoOrderByRule implements 
IAlgebraicRewriteRule {
         // Create the new ORDER operator, set the topK value, and replace the 
current one.
         OrderOperator newOrderOp = new 
OrderOperator(orderOp.getOrderExpressions(), topK);
         newOrderOp.setSourceLocation(orderOp.getSourceLocation());
-        newOrderOp.setPhysicalOperator(
-                new 
StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort(), 
newOrderOp.getTopK()));
+        newOrderOp.setPhysicalOperator(new 
StableSortPOperator(newOrderOp.getTopK()));
         newOrderOp.getInputs().addAll(orderOp.getInputs());
         newOrderOp.setExecutionMode(orderOp.getExecutionMode());
         newOrderOp.recomputeSchema();
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 69eecfd..31de7ee 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -130,11 +130,7 @@ public final class SetAsterixPhysicalOperatorsRule extends 
SetAlgebricksPhysical
             }
 
             generateMergeAggregationExpressions(gby);
-
-            return new ExternalGroupByPOperator(gby.getGroupByVarList(),
-                    physicalOptimizationConfig.getMaxFramesForGroupBy(),
-                    (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
-                            * physicalOptimizationConfig.getFrameSize());
+            return new ExternalGroupByPOperator(gby.getGroupByVarList());
         }
 
         private void generateMergeAggregationExpressions(GroupByOperator gby) 
throws AlgebricksException {
@@ -252,12 +248,11 @@ public final class SetAsterixPhysicalOperatorsRule 
extends SetAlgebricksPhysical
                 boolean nestedTrivialAggregates =
                         
winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
                 return new WindowPOperator(winOp.getPartitionVarList(), 
winOp.getOrderColumnList(),
-                        frameStartIsMonotonic, frameEndIsMonotonic, 
nestedTrivialAggregates,
-                        
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+                        frameStartIsMonotonic, frameEndIsMonotonic, 
nestedTrivialAggregates);
             } else if (AnalysisUtil.hasFunctionWithProperty(winOp,
                     
BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION)) {
-                return new WindowPOperator(winOp.getPartitionVarList(), 
winOp.getOrderColumnList(), false, false, false,
-                        
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+                return new WindowPOperator(winOp.getPartitionVarList(), 
winOp.getOrderColumnList(), false, false,
+                        false);
             } else {
                 return new WindowStreamPOperator(winOp.getPartitionVarList(), 
winOp.getOrderColumnList());
             }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index a2c1c33..e8bf46a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -20,11 +20,11 @@ package org.apache.asterix.app.resource;
 
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 
 public class OperatorResourcesComputer {
 
@@ -32,21 +32,10 @@ public class OperatorResourcesComputer {
     private static final long MAX_BUFFER_PER_CONNECTION = 1L;
 
     private final int numComputationPartitions;
-    private final long groupByMemorySize;
-    private final long joinMemorySize;
-    private final long sortMemorySize;
-    private final long windowMemorySize;
-    private final long textSearchMemorySize;
     private final long frameSize;
 
-    public OperatorResourcesComputer(int numComputationPartitions, int 
sortFrameLimit, int groupFrameLimit,
-            int joinFrameLimit, int windowFrameLimit, int 
textSearchFrameLimit, long frameSize) {
+    public OperatorResourcesComputer(int numComputationPartitions, long 
frameSize) {
         this.numComputationPartitions = numComputationPartitions;
-        this.groupByMemorySize = groupFrameLimit * frameSize;
-        this.joinMemorySize = joinFrameLimit * frameSize;
-        this.sortMemorySize = sortFrameLimit * frameSize;
-        this.windowMemorySize = windowFrameLimit * frameSize;
-        this.textSearchMemorySize = textSearchFrameLimit * frameSize;
         this.frameSize = frameSize;
     }
 
@@ -59,79 +48,25 @@ public class OperatorResourcesComputer {
     }
 
     public long getOperatorRequiredMemory(ILogicalOperator operator) {
-        switch (operator.getOperatorTag()) {
-            case AGGREGATE:
-            case ASSIGN:
-            case DATASOURCESCAN:
-            case DISTINCT:
-            case DISTRIBUTE_RESULT:
-            case EMPTYTUPLESOURCE:
-            case DELEGATE_OPERATOR:
-            case EXTERNAL_LOOKUP:
-            case LIMIT:
-            case MATERIALIZE:
-            case NESTEDTUPLESOURCE:
-            case PROJECT:
-            case REPLICATE:
-            case RUNNINGAGGREGATE:
-            case SCRIPT:
-            case SELECT:
-            case SINK:
-            case SPLIT:
-            case SUBPLAN:
-            case TOKENIZE:
-            case UNIONALL:
-            case UNNEST:
-            case LEFT_OUTER_UNNEST:
-            case UPDATE:
-            case WRITE:
-            case WRITE_RESULT:
-            case INDEX_INSERT_DELETE_UPSERT:
-            case INSERT_DELETE_UPSERT:
-            case INTERSECT:
-            case FORWARD:
-                return getOperatorRequiredMemory(operator, frameSize);
-            case LEFT_OUTER_UNNEST_MAP:
-            case UNNEST_MAP:
-                // Since an inverted-index search requires certain amount of 
memory, needs to calculate
-                // the memory size differently if the given index-search is an 
inverted-index search.
-                long unnestMapMemorySize = frameSize;
-                if (isInvertedIndexSearch((AbstractUnnestMapOperator) 
operator)) {
-                    unnestMapMemorySize += textSearchMemorySize;
-                }
-                return getOperatorRequiredMemory(operator, 
unnestMapMemorySize);
-            case EXCHANGE:
-                return getExchangeRequiredMemory((ExchangeOperator) operator);
-            case GROUP:
-                return getOperatorRequiredMemory(operator, groupByMemorySize);
-            case ORDER:
-                return getOperatorRequiredMemory(operator, sortMemorySize);
-            case INNERJOIN:
-            case LEFTOUTERJOIN:
-                return getOperatorRequiredMemory(operator, joinMemorySize);
-            case WINDOW:
-                return getWindowRequiredMemory((WindowOperator) operator);
-            default:
-                throw new IllegalStateException("Unrecognized operator: " + 
operator.getOperatorTag());
+        if (operator.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
+            return getExchangeRequiredMemory((ExchangeOperator) operator);
+        } else {
+            IPhysicalOperator physOp = ((AbstractLogicalOperator) 
operator).getPhysicalOperator();
+            return getOperatorRequiredMemory(operator.getExecutionMode(), 
physOp.getLocalMemoryRequirements());
         }
     }
 
-    private long getOperatorRequiredMemory(ILogicalOperator op, long 
memorySize) {
-        if (op.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.PARTITIONED
-                || op.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.LOCAL) {
+    private long 
getOperatorRequiredMemory(AbstractLogicalOperator.ExecutionMode opExecMode, 
long memorySize) {
+        if (opExecMode == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+                || opExecMode == AbstractLogicalOperator.ExecutionMode.LOCAL) {
             return memorySize * numComputationPartitions;
         }
         return memorySize;
     }
 
-    private boolean isInvertedIndexSearch(AbstractUnnestMapOperator op) {
-        IPhysicalOperator physicalOperator = op.getPhysicalOperator();
-        final PhysicalOperatorTag physicalOperatorTag = 
physicalOperator.getOperatorTag();
-        if (physicalOperatorTag == 
PhysicalOperatorTag.LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH
-                || physicalOperatorTag == 
PhysicalOperatorTag.SINGLE_PARTITION_INVERTED_INDEX_SEARCH) {
-            return true;
-        }
-        return false;
+    private long 
getOperatorRequiredMemory(AbstractLogicalOperator.ExecutionMode opExecMode,
+            LocalMemoryRequirements memoryReqs) {
+        return getOperatorRequiredMemory(opExecMode, 
memoryReqs.getMemoryBudgetInBytes(frameSize));
     }
 
     private long getExchangeRequiredMemory(ExchangeOperator op) {
@@ -139,16 +74,8 @@ public class OperatorResourcesComputer {
         final PhysicalOperatorTag physicalOperatorTag = 
physicalOperator.getOperatorTag();
         if (physicalOperatorTag == PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE
                 || physicalOperatorTag == 
PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
-            return getOperatorRequiredMemory(op, frameSize);
+            return getOperatorRequiredMemory(op.getExecutionMode(), frameSize);
         }
         return 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * 
numComputationPartitions * frameSize;
     }
-
-    private long getWindowRequiredMemory(WindowOperator op) {
-        // memory budget configuration only applies to window operators that 
materialize partitions (non-streaming)
-        // streaming window operators only need 2 frames: output + 
(conservative estimate) last frame partition columns
-        long memorySize = op.getPhysicalOperator().getOperatorTag() == 
PhysicalOperatorTag.WINDOW_STREAM ? 2 * frameSize
-                : windowMemorySize;
-        return getOperatorRequiredMemory(op, memorySize);
-    }
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index 0f4c4c0..ba11956 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -55,14 +55,8 @@ public class ResourceUtils {
             AlgebricksAbsolutePartitionConstraint computationLocations,
             PhysicalOptimizationConfig physicalOptimizationConfig) throws 
AlgebricksException {
         final int frameSize = physicalOptimizationConfig.getFrameSize();
-        final int sortFrameLimit = 
physicalOptimizationConfig.getMaxFramesExternalSort();
-        final int groupFrameLimit = 
physicalOptimizationConfig.getMaxFramesForGroupBy();
-        final int joinFrameLimit = 
physicalOptimizationConfig.getMaxFramesForJoin();
-        final int windowFrameLimit = 
physicalOptimizationConfig.getMaxFramesForWindow();
-        final int textSearchFrameLimit = 
physicalOptimizationConfig.getMaxFramesForTextSearch();
         final List<PlanStage> planStages = getStages(plan);
-        return getStageBasedRequiredCapacity(planStages, 
computationLocations.getLocations().length, sortFrameLimit,
-                groupFrameLimit, joinFrameLimit, windowFrameLimit, 
textSearchFrameLimit, frameSize);
+        return getStageBasedRequiredCapacity(planStages, 
computationLocations.getLocations().length, frameSize);
     }
 
     public static List<PlanStage> getStages(ILogicalPlan plan) throws 
AlgebricksException {
@@ -74,15 +68,13 @@ public class ResourceUtils {
     }
 
     public static IClusterCapacity 
getStageBasedRequiredCapacity(List<PlanStage> stages, int computationLocations,
-            int sortFrameLimit, int groupFrameLimit, int joinFrameLimit, int 
windowFrameLimit, int textSearchFrameLimit,
             int frameSize) {
-        final OperatorResourcesComputer computer = new 
OperatorResourcesComputer(computationLocations, sortFrameLimit,
-                groupFrameLimit, joinFrameLimit, windowFrameLimit, 
textSearchFrameLimit, frameSize);
+        final OperatorResourcesComputer computer = new 
OperatorResourcesComputer(computationLocations, frameSize);
         final IClusterCapacity clusterCapacity = new ClusterCapacity();
-        final Long maxRequiredMemory = stages.stream().mapToLong(stage -> 
stage.getRequiredMemory(computer)).max()
+        final long maxRequiredMemory = stages.stream().mapToLong(stage -> 
stage.getRequiredMemory(computer)).max()
                 .orElseThrow(IllegalStateException::new);
         clusterCapacity.setAggregatedMemoryByteSize(maxRequiredMemory);
-        final Integer maxRequireCores = stages.stream().mapToInt(stage -> 
stage.getRequiredCores(computer)).max()
+        final int maxRequireCores = stages.stream().mapToInt(stage -> 
stage.getRequiredCores(computer)).max()
                 .orElseThrow(IllegalStateException::new);
         clusterCapacity.setAggregatedCores(maxRequireCores);
         return clusterCapacity;
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index 094009e..b8beccc 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -41,6 +41,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
@@ -51,10 +52,20 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOpe
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractGroupByPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import org.junit.Assert;
@@ -64,9 +75,7 @@ public class PlanStagesGeneratorTest {
 
     private static final Set<LogicalOperatorTag> BLOCKING_OPERATORS =
             new HashSet<>(Arrays.asList(INNERJOIN, LEFTOUTERJOIN, ORDER));
-    private static final long MEMORY_BUDGET = 33554432L;
     private static final int FRAME_SIZE = 32768;
-    private static final int FRAME_LIMIT = (int) (MEMORY_BUDGET / FRAME_SIZE);
     private static final int PARALLELISM = 10;
     private static final long MAX_BUFFER_PER_CONNECTION = 1L;
 
@@ -74,9 +83,11 @@ public class PlanStagesGeneratorTest {
     public void noBlockingPlan() throws AlgebricksException {
         EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
         ets.setExecutionMode(UNPARTITIONED);
+        ets.setPhysicalOperator(new EmptyTupleSourcePOperator());
 
         AssignOperator assignOperator = new 
AssignOperator(Collections.emptyList(), null);
         assignOperator.setExecutionMode(UNPARTITIONED);
+        assignOperator.setPhysicalOperator(new AssignPOperator());
         assignOperator.getInputs().add(new MutableObject<>(ets));
 
         ExchangeOperator exchange = new ExchangeOperator();
@@ -86,8 +97,9 @@ public class PlanStagesGeneratorTest {
 
         DistributeResultOperator resultOperator = new 
DistributeResultOperator(null, null);
         resultOperator.setExecutionMode(UNPARTITIONED);
+        resultOperator.setPhysicalOperator(new DistributeResultPOperator());
         resultOperator.getInputs().add(new MutableObject<>(exchange));
-        ALogicalPlanImpl plan = new 
ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+        ALogicalPlanImpl plan = new 
ALogicalPlanImpl(Collections.singletonList(new 
MutableObject<>(resultOperator)));
 
         List<PlanStage> stages = ResourceUtils.getStages(plan);
         // ensure a single stage plan
@@ -103,9 +115,11 @@ public class PlanStagesGeneratorTest {
     public void testNonBlockingGroupByOrderBy() throws AlgebricksException {
         EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
         ets.setExecutionMode(PARTITIONED);
+        ets.setPhysicalOperator(new EmptyTupleSourcePOperator());
 
         DataSourceScanOperator scanOperator = new 
DataSourceScanOperator(Collections.emptyList(), null);
         scanOperator.setExecutionMode(PARTITIONED);
+        scanOperator.setPhysicalOperator(new DataSourceScanPOperator(null));
         scanOperator.getInputs().add(new MutableObject<>(ets));
 
         ExchangeOperator exchange = new ExchangeOperator();
@@ -115,18 +129,19 @@ public class PlanStagesGeneratorTest {
 
         GroupByOperator groupByOperator = new GroupByOperator();
         groupByOperator.setExecutionMode(PARTITIONED);
-        groupByOperator
-                .setPhysicalOperator(new 
PreclusteredGroupByPOperator(Collections.emptyList(), true, FRAME_LIMIT));
+        groupByOperator.setPhysicalOperator(new 
PreclusteredGroupByPOperator(Collections.emptyList(), true));
         groupByOperator.getInputs().add(new MutableObject<>(exchange));
 
         OrderOperator orderOperator = new OrderOperator();
         orderOperator.setExecutionMode(PARTITIONED);
+        orderOperator.setPhysicalOperator(new StableSortPOperator());
         orderOperator.getInputs().add(new MutableObject<>(groupByOperator));
 
         DistributeResultOperator resultOperator = new 
DistributeResultOperator(null, null);
         resultOperator.setExecutionMode(PARTITIONED);
+        resultOperator.setPhysicalOperator(new DistributeResultPOperator());
         resultOperator.getInputs().add(new MutableObject<>(orderOperator));
-        ALogicalPlanImpl plan = new 
ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+        ALogicalPlanImpl plan = new 
ALogicalPlanImpl(Collections.singletonList(new 
MutableObject<>(resultOperator)));
 
         final List<PlanStage> stages = ResourceUtils.getStages(plan);
         validateStages(stages, ets, exchange, groupByOperator, orderOperator, 
resultOperator);
@@ -136,8 +151,10 @@ public class PlanStagesGeneratorTest {
 
         // dominating stage should have orderBy, orderBy's input (groupby), 
groupby's input (exchange),
         // exchange's input (scanOperator), and scanOperator's input (ets)
-        long orderOperatorRequiredMemory = FRAME_LIMIT * FRAME_SIZE * 
PARALLELISM;
-        long groupByOperatorRequiredMemory = FRAME_LIMIT * FRAME_SIZE * 
PARALLELISM;
+        long orderOperatorRequiredMemory =
+                AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT * 
FRAME_SIZE * PARALLELISM;
+        long groupByOperatorRequiredMemory =
+                AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY * 
FRAME_SIZE * PARALLELISM;
         long exchangeRequiredMemory = PARALLELISM * FRAME_SIZE;
         long scanOperatorRequiredMemory = PARALLELISM * FRAME_SIZE;
         long etsRequiredMemory = FRAME_SIZE * PARALLELISM;
@@ -151,20 +168,26 @@ public class PlanStagesGeneratorTest {
     public void testJoinGroupby() throws AlgebricksException {
         EmptyTupleSourceOperator ets1 = new EmptyTupleSourceOperator();
         ets1.setExecutionMode(PARTITIONED);
+        ets1.setPhysicalOperator(new EmptyTupleSourcePOperator());
 
         DataSourceScanOperator scanOperator1 = new 
DataSourceScanOperator(Collections.emptyList(), null);
         scanOperator1.setExecutionMode(PARTITIONED);
+        scanOperator1.setPhysicalOperator(new DataSourceScanPOperator(null));
         scanOperator1.getInputs().add(new MutableObject<>(ets1));
 
         EmptyTupleSourceOperator ets2 = new EmptyTupleSourceOperator();
-        ets1.setExecutionMode(PARTITIONED);
+        ets2.setExecutionMode(PARTITIONED);
+        ets2.setPhysicalOperator(new EmptyTupleSourcePOperator());
 
         DataSourceScanOperator scanOperator2 = new 
DataSourceScanOperator(Collections.emptyList(), null);
         scanOperator2.setExecutionMode(PARTITIONED);
+        scanOperator2.setPhysicalOperator(new DataSourceScanPOperator(null));
         scanOperator2.getInputs().add(new MutableObject<>(ets2));
 
         InnerJoinOperator firstJoin = new InnerJoinOperator(new 
MutableObject<>(ConstantExpression.TRUE));
         firstJoin.setExecutionMode(PARTITIONED);
+        firstJoin.setPhysicalOperator(new 
NestedLoopJoinPOperator(firstJoin.getJoinKind(),
+                AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
         firstJoin.getInputs().add(new MutableObject<>(scanOperator1));
         firstJoin.getInputs().add(new MutableObject<>(scanOperator2));
 
@@ -174,11 +197,11 @@ public class PlanStagesGeneratorTest {
         exchangeOperator1.getInputs().add(new MutableObject<>(firstJoin));
 
         EmptyTupleSourceOperator ets3 = new EmptyTupleSourceOperator();
-        ets1.setExecutionMode(PARTITIONED);
+        ets3.setExecutionMode(PARTITIONED);
+        ets3.setPhysicalOperator(new EmptyTupleSourcePOperator());
 
         GroupByOperator groupByOperator = new GroupByOperator();
-        groupByOperator
-                .setPhysicalOperator(new 
ExternalGroupByPOperator(Collections.emptyList(), FRAME_LIMIT, FRAME_LIMIT));
+        groupByOperator.setPhysicalOperator(new 
ExternalGroupByPOperator(Collections.emptyList()));
         groupByOperator.setExecutionMode(LOCAL);
         groupByOperator.getInputs().add(new MutableObject<>(ets3));
 
@@ -189,13 +212,16 @@ public class PlanStagesGeneratorTest {
 
         LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new 
MutableObject<>(ConstantExpression.TRUE));
         secondJoin.setExecutionMode(PARTITIONED);
+        secondJoin.setPhysicalOperator(new 
NestedLoopJoinPOperator(secondJoin.getJoinKind(),
+                AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
         secondJoin.getInputs().add(new MutableObject<>(exchangeOperator1));
         secondJoin.getInputs().add(new MutableObject<>(exchangeOperator2));
 
         DistributeResultOperator resultOperator = new 
DistributeResultOperator(null, null);
         resultOperator.setExecutionMode(PARTITIONED);
+        resultOperator.setPhysicalOperator(new DistributeResultPOperator());
         resultOperator.getInputs().add(new MutableObject<>(secondJoin));
-        ALogicalPlanImpl plan = new 
ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+        ALogicalPlanImpl plan = new 
ALogicalPlanImpl(Collections.singletonList(new 
MutableObject<>(resultOperator)));
 
         List<PlanStage> stages = ResourceUtils.getStages(plan);
         final int expectedStages = 4;
@@ -207,9 +233,9 @@ public class PlanStagesGeneratorTest {
         // resultOperator, its input (secondJoin), secondJoin's first input 
(exchangeOperator1), exchangeOperator1's
         // input (firstJoin), firstJoin's first input (scanOperator1), and 
scanOperator1's input (ets1)
         long resultOperatorRequiredMemory = FRAME_SIZE * PARALLELISM;
-        long secondJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+        long secondJoinRequiredMemory = 
AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN * FRAME_SIZE * PARALLELISM;
         long exchangeOperator1RequiredMemory = 2 * MAX_BUFFER_PER_CONNECTION * 
PARALLELISM * PARALLELISM * FRAME_SIZE;
-        long firstJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+        long firstJoinRequiredMemory = 
AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN * FRAME_SIZE * PARALLELISM;
         long scanOperator1RequiredMemory = FRAME_SIZE * PARALLELISM;
         long ets1RequiredMemory = FRAME_SIZE * PARALLELISM;
 
@@ -222,34 +248,40 @@ public class PlanStagesGeneratorTest {
     public void testReplicateSortJoin() throws AlgebricksException {
         EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
         ets.setExecutionMode(PARTITIONED);
+        ets.setPhysicalOperator(new EmptyTupleSourcePOperator());
 
         DataSourceScanOperator scanOperator = new 
DataSourceScanOperator(Collections.emptyList(), null);
         scanOperator.setExecutionMode(PARTITIONED);
+        scanOperator.setPhysicalOperator(new DataSourceScanPOperator(null));
         scanOperator.getInputs().add(new MutableObject<>(ets));
 
         ReplicateOperator replicateOperator = new ReplicateOperator(2);
         replicateOperator.setExecutionMode(PARTITIONED);
+        replicateOperator.setPhysicalOperator(new ReplicatePOperator());
         replicateOperator.getInputs().add(new MutableObject<>(scanOperator));
 
         OrderOperator order1 = new OrderOperator();
         order1.setExecutionMode(PARTITIONED);
-        order1.setPhysicalOperator(new OneToOneExchangePOperator());
+        order1.setPhysicalOperator(new StableSortPOperator());
         order1.getInputs().add(new MutableObject<>(replicateOperator));
 
         OrderOperator order2 = new OrderOperator();
         order2.setExecutionMode(PARTITIONED);
-        order2.setPhysicalOperator(new OneToOneExchangePOperator());
+        order2.setPhysicalOperator(new StableSortPOperator());
         order2.getInputs().add(new MutableObject<>(replicateOperator));
 
         LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new 
MutableObject<>(ConstantExpression.TRUE));
         secondJoin.setExecutionMode(PARTITIONED);
+        secondJoin.setPhysicalOperator(new 
NestedLoopJoinPOperator(secondJoin.getJoinKind(),
+                AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
         secondJoin.getInputs().add(new MutableObject<>(order1));
         secondJoin.getInputs().add(new MutableObject<>(order2));
 
         DistributeResultOperator resultOperator = new 
DistributeResultOperator(null, null);
         resultOperator.setExecutionMode(PARTITIONED);
+        resultOperator.setPhysicalOperator(new DistributeResultPOperator());
         resultOperator.getInputs().add(new MutableObject<>(secondJoin));
-        ALogicalPlanImpl plan = new 
ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+        ALogicalPlanImpl plan = new 
ALogicalPlanImpl(Collections.singletonList(new 
MutableObject<>(resultOperator)));
 
         List<PlanStage> stages = ResourceUtils.getStages(plan);
         final int expectedStages = 3;
@@ -257,14 +289,14 @@ public class PlanStagesGeneratorTest {
         validateStages(stages);
 
         // dominating stage should have the following operators:
-        // secondJoin, secondJoin's second input (order2), order2's input 
(replicate),
+        // order1, order2, order1 and order2's input (replicate),
         // replicate's input (scanOperator), scanOperator's input (ets)
-        long secondJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
-        long order2RequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+        long order1RequiredMemory = 
AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT * FRAME_SIZE * PARALLELISM;
+        long order2RequiredMemory = 
AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT * FRAME_SIZE * PARALLELISM;
         long replicateOperatorRequiredMemory = FRAME_SIZE * PARALLELISM;
         long scanOperator1RequiredMemory = FRAME_SIZE * PARALLELISM;
         long etsRequiredMemory = FRAME_SIZE * PARALLELISM;
-        long expectedMemory = secondJoinRequiredMemory + order2RequiredMemory 
+ replicateOperatorRequiredMemory
+        long expectedMemory = order1RequiredMemory + order2RequiredMemory + 
replicateOperatorRequiredMemory
                 + scanOperator1RequiredMemory + etsRequiredMemory;
         assertRequiredMemory(stages, expectedMemory);
     }
@@ -300,8 +332,13 @@ public class PlanStagesGeneratorTest {
     }
 
     private void assertRequiredMemory(List<PlanStage> stages, long 
expectedMemory) {
-        final IClusterCapacity clusterCapacity = 
ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM,
-                FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, 
FRAME_LIMIT, FRAME_SIZE);
+        for (PlanStage stage : stages) {
+            for (ILogicalOperator op : stage.getOperators()) {
+                ((AbstractLogicalOperator) 
op).getPhysicalOperator().createLocalMemoryRequirements(op);
+            }
+        }
+        final IClusterCapacity clusterCapacity =
+                ResourceUtils.getStageBasedRequiredCapacity(stages, 
PARALLELISM, FRAME_SIZE);
         Assert.assertEquals(clusterCapacity.getAggregatedMemoryByteSize(), 
expectedMemory);
     }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index e8643cd..b29cb61 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -23,6 +23,10 @@ import java.util.Map;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractGroupByPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
 import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.api.config.IOptionType;
 import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -30,13 +34,11 @@ import org.apache.hyracks.control.common.config.OptionTypes;
 
 public class OptimizationConfUtil {
 
-    private static final int MIN_FRAME_LIMIT_FOR_SORT = 3;
-    private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = 4;
-    private static final int MIN_FRAME_LIMIT_FOR_JOIN = 5;
-    // 1 (output) + 1 (input copy) + 1 (partition writer) + 2 (seekable 
partition reader)
-    private static final int MIN_FRAME_LIMIT_FOR_WINDOW = 5;
-    // one for query, two for intermediate results, one for final result, and 
one for reading an inverted list
-    private static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = 5;
+    private static final int MIN_FRAME_LIMIT_FOR_SORT = 
AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT;
+    private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = 
AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY;
+    private static final int MIN_FRAME_LIMIT_FOR_JOIN = 
AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN;
+    private static final int MIN_FRAME_LIMIT_FOR_WINDOW = 
WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW;
+    public static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = 5; // see 
InvertedIndexPOperator
 
     private OptimizationConfUtil() {
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
index a88ec64..a5e22cd 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
@@ -22,6 +22,7 @@ import 
org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 
@@ -51,6 +52,10 @@ public interface IPhysicalOperator {
     public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context)
             throws AlgebricksException;
 
+    public LocalMemoryRequirements getLocalMemoryRequirements();
+
+    public void createLocalMemoryRequirements(ILogicalOperator op);
+
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
             throws AlgebricksException;
@@ -72,10 +77,10 @@ public interface IPhysicalOperator {
     public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator 
op);
 
     /*
-     * This is needed to have a kind of cost based decision on whether to 
merge the shared subplans and materialize the result.
-     * If the subgraph whose result we would like to materialize has an 
operator that is computationally expensive, we assume
-     * it is cheaper to materialize the result of this subgraph and read from 
the file rather than recomputing it.
+     * This is needed to have a kind of cost based decision on whether to 
merge the shared subplans and materialize
+     * the result. If the subgraph whose result we would like to materialize 
has an operator that is computationally
+     * expensive, we assume it is cheaper to materialize the result of this 
subgraph and read from the file rather
+     * than recomputing it.
      */
     public boolean expensiveThanMaterialization();
-
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
index ce6dedc..17c60b9 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
@@ -21,17 +21,19 @@ package 
org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
 import java.util.List;
 
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 
 public abstract class AbstractGroupByPOperator extends 
AbstractPhysicalOperator {
 
-    protected List<LogicalVariable> columnList;
+    // variable memory, min 4 frames
+    public static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = 4;
 
-    protected final int framesLimit;
+    protected List<LogicalVariable> columnList;
 
-    protected AbstractGroupByPOperator(List<LogicalVariable> columnList, int 
framesLimit) {
+    protected AbstractGroupByPOperator(List<LogicalVariable> columnList) {
         this.columnList = columnList;
-        this.framesLimit = framesLimit;
     }
 
     public List<LogicalVariable> getGroupByColumns() {
@@ -48,6 +50,11 @@ public abstract class AbstractGroupByPOperator extends 
AbstractPhysicalOperator
     }
 
     @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = 
LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_GROUP_BY);
+    }
+
+    @Override
     public String toString() {
         return getOperatorTag().toString() + columnList;
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
index aea9b3e..c300392 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
@@ -21,9 +21,13 @@ package 
org.apache.hyracks.algebricks.core.algebra.operators.physical;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 
 public abstract class AbstractJoinPOperator extends AbstractPhysicalOperator {
 
+    // variable memory, min 5 frames
+    public static final int MIN_FRAME_LIMIT_FOR_JOIN = 5;
+
     public enum JoinPartitioningType {
         PAIRWISE,
         BROADCAST
@@ -56,4 +60,9 @@ public abstract class AbstractJoinPOperator extends 
AbstractPhysicalOperator {
     public boolean expensiveThanMaterialization() {
         return true;
     }
+
+    @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = 
LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_JOIN);
+    }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index 29d6037..96fbe3e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -37,6 +37,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOper
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -50,6 +51,7 @@ import org.apache.hyracks.api.job.JobSpecification;
 public abstract class AbstractPhysicalOperator implements IPhysicalOperator {
 
     protected IPhysicalPropertiesVector deliveredProperties;
+    protected LocalMemoryRequirements localMemoryRequirements;
     private boolean disableJobGenBelow = false;
     private Object hostQueryContext;
 
@@ -88,6 +90,16 @@ public abstract class AbstractPhysicalOperator implements 
IPhysicalOperator {
     }
 
     @Override
+    public LocalMemoryRequirements getLocalMemoryRequirements() {
+        return localMemoryRequirements;
+    }
+
+    @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = LocalMemoryRequirements.fixedMemoryBudget(1);
+    }
+
+    @Override
     public void disableJobGenBelowMe() {
         this.disableJobGenBelow = true;
     }
@@ -141,14 +153,13 @@ public abstract class AbstractPhysicalOperator implements 
IPhysicalOperator {
         List<List<AlgebricksPipeline>> subplans = new 
ArrayList<>(npOp.getNestedPlans().size());
         PlanCompiler pc = new PlanCompiler(context);
         for (ILogicalPlan p : npOp.getNestedPlans()) {
-            subplans.add(buildPipelineWithProjection(p, outerPlanSchema, npOp, 
opSchema, pc));
+            subplans.add(buildPipelineWithProjection(p, outerPlanSchema, 
opSchema, pc));
         }
         return subplans;
     }
 
     private List<AlgebricksPipeline> buildPipelineWithProjection(ILogicalPlan 
p, IOperatorSchema outerPlanSchema,
-            AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, 
PlanCompiler pc)
-            throws AlgebricksException {
+            IOperatorSchema opSchema, PlanCompiler pc) throws 
AlgebricksException {
         if (p.getRoots().size() > 1) {
             throw new NotImplementedException("Nested plans with several roots 
are not supported.");
         }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 23a411c..fdb7347 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -59,8 +59,8 @@ import 
org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 
 public abstract class AbstractPreclusteredGroupByPOperator extends 
AbstractGroupByPOperator {
 
-    protected AbstractPreclusteredGroupByPOperator(List<LogicalVariable> 
columnList, int framesLimit) {
-        super(columnList, framesLimit);
+    protected AbstractPreclusteredGroupByPOperator(List<LogicalVariable> 
columnList) {
+        super(columnList);
     }
 
     // Obs: We don't propagate properties corresponding to decors, since they
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index 81852d4..eaec772 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -41,6 +41,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
@@ -49,12 +50,13 @@ import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertie
 
 public abstract class AbstractStableSortPOperator extends 
AbstractPhysicalOperator {
 
-    final int maxNumberOfFrames;
+    // variable memory, min 3 frames
+    public static final int MIN_FRAME_LIMIT_FOR_SORT = 3;
+
     OrderColumn[] sortColumns;
     ILocalStructuralProperty orderProp;
 
-    AbstractStableSortPOperator(int maxNumberOfFrames) {
-        this.maxNumberOfFrames = maxNumberOfFrames;
+    AbstractStableSortPOperator() {
     }
 
     public OrderColumn[] getSortColumns() {
@@ -163,4 +165,9 @@ public abstract class AbstractStableSortPOperator extends 
AbstractPhysicalOperat
                 && clusterDomain.cardinality() != null && 
clusterDomain.cardinality() > 1
                 && ctx.getPhysicalOptimizationConfig().getSortParallel();
     }
+
+    @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = 
LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_SORT);
+    }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 927beae..bf0a824 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -68,11 +68,8 @@ import 
org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescr
 
 public class ExternalGroupByPOperator extends AbstractGroupByPOperator {
 
-    private final long inputSize;
-
-    public ExternalGroupByPOperator(List<LogicalVariable> columnList, int 
framesLimit, long fileSize) {
-        super(columnList, framesLimit);
-        this.inputSize = fileSize;
+    public ExternalGroupByPOperator(List<LogicalVariable> columnList) {
+        super(columnList);
     }
 
     @Override
@@ -238,11 +235,14 @@ public class ExternalGroupByPOperator extends 
AbstractGroupByPOperator {
                 
JobGenHelper.variablesToAscNormalizedKeyComputerFactory(gbyCols, aggOpInputEnv, 
context);
 
         // Calculates the hash table size (# of unique hash values) based on 
the budget and a tuple size.
-        int memoryBudgetInBytes = context.getFrameSize() * framesLimit;
+        int frameSize = context.getFrameSize();
+        long memoryBudgetInBytes = 
localMemoryRequirements.getMemoryBudgetInBytes(frameSize);
         int groupByColumnsCount = gby.getGroupByList().size() + numFds;
         int hashTableSize = 
ExternalGroupOperatorDescriptor.calculateGroupByTableCardinality(memoryBudgetInBytes,
-                groupByColumnsCount, context.getFrameSize());
+                groupByColumnsCount, frameSize);
 
+        int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
+        long inputSize = framesLimit * (long) frameSize;
         ExternalGroupOperatorDescriptor gbyOpDesc = new 
ExternalGroupOperatorDescriptor(spec, hashTableSize, inputSize,
                 keyAndDecFields, framesLimit, comparatorFactories, 
normalizedKeyFactory, aggregatorFactory,
                 mergeFactory, recordDescriptor, recordDescriptor, new 
HashSpillableTableFactory(hashFunctionFactories));
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 6d8b6e3..8a56fbb 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -60,8 +60,6 @@ import org.apache.logging.log4j.Logger;
 
 public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
 
-    // The maximum number of in-memory frames that this hash join can use.
-    private final int memSizeInFrames;
     private final int maxInputBuildSizeInFrames;
     private final double fudgeFactor;
 
@@ -69,17 +67,15 @@ public class HybridHashJoinPOperator extends 
AbstractHashJoinPOperator {
 
     public HybridHashJoinPOperator(JoinKind kind, JoinPartitioningType 
partitioningType,
             List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> 
sideRightOfEqualities,
-            int memSizeInFrames, int maxInputSizeInFrames, int 
aveRecordsPerFrame, double fudgeFactor) {
+            int maxInputSizeInFrames, int aveRecordsPerFrame, double 
fudgeFactor) {
         super(kind, partitioningType, sideLeftOfEqualities, 
sideRightOfEqualities);
-        this.memSizeInFrames = memSizeInFrames;
         this.maxInputBuildSizeInFrames = maxInputSizeInFrames;
         this.fudgeFactor = fudgeFactor;
         if (LOGGER.isTraceEnabled()) {
             LOGGER.trace("HybridHashJoinPOperator constructed with: JoinKind=" 
+ kind + ", JoinPartitioningType="
                     + partitioningType + ", List<LogicalVariable>=" + 
sideLeftOfEqualities + ", List<LogicalVariable>="
-                    + sideRightOfEqualities + ", int memSizeInFrames=" + 
memSizeInFrames
-                    + ", int maxInputSize0InFrames=" + maxInputSizeInFrames + 
", int aveRecordsPerFrame="
-                    + aveRecordsPerFrame + ", double fudgeFactor=" + 
fudgeFactor + ".");
+                    + sideRightOfEqualities + ", int maxInputSize0InFrames=" + 
maxInputSizeInFrames
+                    + ", int aveRecordsPerFrame=" + aveRecordsPerFrame + ", 
double fudgeFactor=" + fudgeFactor + ".");
         }
     }
 
@@ -97,10 +93,6 @@ public class HybridHashJoinPOperator extends 
AbstractHashJoinPOperator {
         return fudgeFactor;
     }
 
-    public int getMemSizeInFrames() {
-        return memSizeInFrames;
-    }
-
     @Override
     public String toString() {
         return getOperatorTag().toString() + " " + keysLeftBranch + 
keysRightBranch;
@@ -156,11 +148,12 @@ public class HybridHashJoinPOperator extends 
AbstractHashJoinPOperator {
             IBinaryHashFunctionFamily[] rightHashFunFamilies, 
IBinaryComparatorFactory[] leftCompFactories,
             IBinaryComparatorFactory[] rightCompFactories, 
IPredicateEvaluatorFactory predEvaluatorFactory,
             RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec) {
+        int memSizeInFrames = 
localMemoryRequirements.getMemoryBudgetInFrames();
         switch (kind) {
             case INNER:
-                return new OptimizedHybridHashJoinOperatorDescriptor(spec, 
getMemSizeInFrames(),
-                        maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, 
keysRight, leftHashFunFamilies,
-                        rightHashFunFamilies, leftCompFactories, 
rightCompFactories, recDescriptor,
+                return new OptimizedHybridHashJoinOperatorDescriptor(spec, 
memSizeInFrames, maxInputBuildSizeInFrames,
+                        getFudgeFactor(), keysLeft, keysRight, 
leftHashFunFamilies, rightHashFunFamilies,
+                        leftCompFactories, rightCompFactories, recDescriptor,
                         new JoinMultiComparatorFactory(leftCompFactories, 
keysLeft, keysRight),
                         new JoinMultiComparatorFactory(rightCompFactories, 
keysRight, keysLeft), predEvaluatorFactory);
             case LEFT_OUTER:
@@ -168,9 +161,9 @@ public class HybridHashJoinPOperator extends 
AbstractHashJoinPOperator {
                 for (int j = 0; j < nonMatchWriterFactories.length; j++) {
                     nonMatchWriterFactories[j] = 
context.getMissingWriterFactory();
                 }
-                return new OptimizedHybridHashJoinOperatorDescriptor(spec, 
getMemSizeInFrames(),
-                        maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, 
keysRight, leftHashFunFamilies,
-                        rightHashFunFamilies, leftCompFactories, 
rightCompFactories, recDescriptor,
+                return new OptimizedHybridHashJoinOperatorDescriptor(spec, 
memSizeInFrames, maxInputBuildSizeInFrames,
+                        getFudgeFactor(), keysLeft, keysRight, 
leftHashFunFamilies, rightHashFunFamilies,
+                        leftCompFactories, rightCompFactories, recDescriptor,
                         new JoinMultiComparatorFactory(leftCompFactories, 
keysLeft, keysRight),
                         new JoinMultiComparatorFactory(rightCompFactories, 
keysRight, keysLeft), predEvaluatorFactory,
                         true, nonMatchWriterFactories);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index 152fcc6..90ae4cd 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -50,19 +50,15 @@ import 
org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
 public class InMemoryHashJoinPOperator extends AbstractHashJoinPOperator {
 
     private final int tableSize;
-    // The maximum number of in-memory frames that this hash join can use.
-    private final int memSizeInFrames;
 
     /**
      * builds on the first operator and probes on the second.
      */
 
     public InMemoryHashJoinPOperator(JoinKind kind, JoinPartitioningType 
partitioningType,
-            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> 
sideRightOfEqualities, int tableSize,
-            int memSizeInFrames) {
+            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> 
sideRightOfEqualities, int tableSize) {
         super(kind, partitioningType, sideLeftOfEqualities, 
sideRightOfEqualities);
         this.tableSize = tableSize;
-        this.memSizeInFrames = memSizeInFrames;
     }
 
     @Override
@@ -104,14 +100,16 @@ public class InMemoryHashJoinPOperator extends 
AbstractHashJoinPOperator {
 
         IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider =
                 context.getPredicateEvaluatorFactoryProvider();
-        IPredicateEvaluatorFactory predEvaluatorFactory = 
(predEvaluatorFactoryProvider == null ? null
-                : 
predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight));
+        IPredicateEvaluatorFactory predEvaluatorFactory = 
predEvaluatorFactoryProvider == null ? null
+                : 
predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
 
         RecordDescriptor recDescriptor =
                 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), 
propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IOperatorDescriptor opDesc;
 
+        int memSizeInFrames = 
localMemoryRequirements.getMemoryBudgetInFrames();
+
         switch (kind) {
             case INNER:
                 opDesc = new InMemoryHashJoinOperatorDescriptor(spec, 
keysLeft, keysRight, leftHashFunFactories,
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
index 350bcfb..2d70abe 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
@@ -39,8 +39,8 @@ import 
org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
 public class MicroPreclusteredGroupByPOperator extends 
AbstractPreclusteredGroupByPOperator {
 
-    public MicroPreclusteredGroupByPOperator(List<LogicalVariable> columnList, 
int framesLimit) {
-        super(columnList, framesLimit);
+    public MicroPreclusteredGroupByPOperator(List<LogicalVariable> columnList) 
{
+        super(columnList);
     }
 
     @Override
@@ -72,6 +72,7 @@ public class MicroPreclusteredGroupByPOperator extends 
AbstractPreclusteredGroup
                 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
         RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), 
inputSchemas[0], context);
+        int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
         MicroPreClusteredGroupRuntimeFactory runtime = new 
MicroPreClusteredGroupRuntimeFactory(keys,
                 comparatorFactories, aggregatorFactory, inputRecordDesc, 
recordDescriptor, null, framesLimit);
         runtime.setSourceLocation(gby.getSourceLocation());
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
index 413c1a4..403ca57 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
@@ -39,8 +39,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 
 public class MicroStableSortPOperator extends AbstractStableSortPOperator {
 
-    public MicroStableSortPOperator(int maxNumberOfFrames) {
-        super(maxNumberOfFrames);
+    public MicroStableSortPOperator() {
     }
 
     @Override
@@ -80,6 +79,7 @@ public class MicroStableSortPOperator extends 
AbstractStableSortPOperator {
             i++;
         }
 
+        int maxNumberOfFrames = 
localMemoryRequirements.getMemoryBudgetInFrames();
         IPushRuntimeFactory runtime = new MicroSortRuntimeFactory(sortFields, 
nkcf, comps, null, maxNumberOfFrames);
         builder.contributeMicroOperator(op, runtime, recDescriptor);
         ILogicalOperator src = op.getInputs().get(0).getValue();
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
index 048b129..524b336 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
@@ -63,11 +63,8 @@ import 
org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
  */
 public class NestedLoopJoinPOperator extends AbstractJoinPOperator {
 
-    private final int memSize;
-
-    public NestedLoopJoinPOperator(JoinKind kind, JoinPartitioningType 
partitioningType, int memSize) {
+    public NestedLoopJoinPOperator(JoinKind kind, JoinPartitioningType 
partitioningType) {
         super(kind, partitioningType);
-        this.memSize = memSize;
     }
 
     @Override
@@ -141,6 +138,7 @@ public class NestedLoopJoinPOperator extends 
AbstractJoinPOperator {
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IOperatorDescriptor opDesc;
 
+        int memSize = localMemoryRequirements.getMemoryBudgetInFrames();
         switch (kind) {
             case INNER:
                 opDesc = new NestedLoopJoinOperatorDescriptor(spec, 
comparatorFactory, recDescriptor, memSize, false,
@@ -187,7 +185,6 @@ public class NestedLoopJoinPOperator extends 
AbstractJoinPOperator {
     }
 
     public static class TuplePairEvaluator implements ITuplePairComparator {
-        private final IHyracksTaskContext ctx;
         private IScalarEvaluator condEvaluator;
         private final IPointable p;
         private final CompositeFrameTupleReference compositeTupleRef;
@@ -197,7 +194,6 @@ public class NestedLoopJoinPOperator extends 
AbstractJoinPOperator {
 
         public TuplePairEvaluator(IHyracksTaskContext ctx, 
IScalarEvaluatorFactory condFactory,
                 IBinaryBooleanInspector binaryBooleanInspector) throws 
HyracksDataException {
-            this.ctx = ctx;
             this.condEvaluator = condFactory.createScalarEvaluator(ctx);
             this.binaryBooleanInspector = binaryBooleanInspector;
             this.leftRef = new FrameTupleReference();
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index b6faa36..c6b49c2 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -44,8 +44,8 @@ public class PreclusteredGroupByPOperator extends 
AbstractPreclusteredGroupByPOp
 
     private final boolean groupAll;
 
-    public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, 
boolean groupAll, int framesLimit) {
-        super(columnList, framesLimit);
+    public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, 
boolean groupAll) {
+        super(columnList);
         this.groupAll = groupAll;
     }
 
@@ -85,6 +85,7 @@ public class PreclusteredGroupByPOperator extends 
AbstractPreclusteredGroupByPOp
         RecordDescriptor recordDescriptor =
                 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
 
+        int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
         PreclusteredGroupOperatorDescriptor opDesc = new 
PreclusteredGroupOperatorDescriptor(spec, keys,
                 comparatorFactories, aggregatorFactory, recordDescriptor, 
groupAll, framesLimit);
         opDesc.setSourceLocation(gby.getSourceLocation());
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
index af8161f..20e197f 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -69,8 +69,8 @@ public class SortGroupByPOperator extends 
AbstractGroupByPOperator {
 
     private final OrderColumn[] orderColumns;
 
-    public SortGroupByPOperator(List<LogicalVariable> columnList, int 
framesLimit, OrderColumn[] orderColumns) {
-        super(columnList, framesLimit);
+    public SortGroupByPOperator(List<LogicalVariable> columnList, 
OrderColumn[] orderColumns) {
+        super(columnList);
         this.orderColumns = orderColumns;
     }
 
@@ -249,6 +249,7 @@ public class SortGroupByPOperator extends 
AbstractGroupByPOperator {
         normalizedKeyFactory =
                 orderColumns[0].getOrder() == OrderKind.ASC ? 
nkcfProvider.getNormalizedKeyComputerFactory(type, true)
                         : nkcfProvider.getNormalizedKeyComputerFactory(type, 
false);
+        int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
         SortGroupByOperatorDescriptor gbyOpDesc = new 
SortGroupByOperatorDescriptor(spec, framesLimit, keys,
                 keyAndDecFields, normalizedKeyFactory, compFactories, 
aggregatorFactory, mergeFactory,
                 partialAggRecordDescriptor, recordDescriptor, false);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
index 9567e5b..93c5c3b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
@@ -49,12 +49,11 @@ public class StableSortPOperator extends 
AbstractStableSortPOperator {
 
     private final int topK;
 
-    public StableSortPOperator(int maxNumberOfFrames) {
-        this(maxNumberOfFrames, -1);
+    public StableSortPOperator() {
+        this(-1);
     }
 
-    public StableSortPOperator(int maxNumberOfFrames, int topK) {
-        super(maxNumberOfFrames);
+    public StableSortPOperator(int topK) {
         this.topK = topK;
     }
 
@@ -98,6 +97,7 @@ public class StableSortPOperator extends 
AbstractStableSortPOperator {
             i++;
         }
 
+        int maxNumberOfFrames = 
localMemoryRequirements.getMemoryBudgetInFrames();
         AbstractSorterOperatorDescriptor sortOpDesc;
         // topK == -1 means that a topK value is not provided.
         if (topK == -1) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 23853e8..2a0b052 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -21,9 +21,11 @@ package 
org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
 import java.util.List;
 
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
@@ -38,23 +40,22 @@ import 
org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 
 public final class WindowPOperator extends AbstractWindowPOperator {
 
+    // variable memory, min 5 frames:
+    // 1 (output) + 1 (input copy conservative) + 1 (partition writer) + 2 
(seekable partition reader)
+    public static final int MIN_FRAME_LIMIT_FOR_WINDOW = 5;
+
     private final boolean frameStartIsMonotonic;
 
     private final boolean frameEndIsMonotonic;
 
     private final boolean nestedTrivialAggregates;
 
-    // The maximum number of in-memory frames that this operator can use.
-    private final int memSizeInFrames;
-
     public WindowPOperator(List<LogicalVariable> partitionColumns, 
List<OrderColumn> orderColumns,
-            boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, 
boolean nestedTrivialAggregates,
-            int memSizeInFrames) {
+            boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, 
boolean nestedTrivialAggregates) {
         super(partitionColumns, orderColumns);
         this.frameStartIsMonotonic = frameStartIsMonotonic;
         this.frameEndIsMonotonic = frameEndIsMonotonic;
         this.nestedTrivialAggregates = nestedTrivialAggregates;
-        this.memSizeInFrames = memSizeInFrames;
     }
 
     @Override
@@ -63,6 +64,11 @@ public final class WindowPOperator extends 
AbstractWindowPOperator {
     }
 
     @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = 
LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_WINDOW);
+    }
+
+    @Override
     protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator 
winOp, int[] partitionColumnsList,
             IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, 
IScalarEvaluatorFactory[] frameValueExprEvals,
@@ -74,6 +80,8 @@ public final class WindowPOperator extends 
AbstractWindowPOperator {
             IRunningAggregateEvaluatorFactory[] runningAggFactories, int 
nestedAggOutSchemaSize,
             WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext 
context) {
 
+        int memSizeInFrames = 
localMemoryRequirements.getMemoryBudgetInFrames();
+
         // special cases
         if (!winOp.hasNestedPlans()) {
             return new WindowMaterializingRuntimeFactory(partitionColumnsList, 
partitionComparatorFactories,
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
index 33b47ec..68476d1 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
@@ -21,9 +21,11 @@ package 
org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
 import java.util.List;
 
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
@@ -35,6 +37,9 @@ import 
org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 
 public final class WindowStreamPOperator extends AbstractWindowPOperator {
 
+    // fixed memory, 2 frames: 1 (output) + 1 (input copy conservative)
+    public static final int MEM_SIZE_IN_FRAMES_FOR_WINDOW_STREAM = 2;
+
     public WindowStreamPOperator(List<LogicalVariable> partitionColumns, 
List<OrderColumn> orderColumns) {
         super(partitionColumns, orderColumns);
     }
@@ -45,6 +50,11 @@ public final class WindowStreamPOperator extends 
AbstractWindowPOperator {
     }
 
     @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = 
LocalMemoryRequirements.fixedMemoryBudget(MEM_SIZE_IN_FRAMES_FOR_WINDOW_STREAM);
+    }
+
+    @Override
     protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator 
winOp, int[] partitionColumnsList,
             IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, 
IScalarEvaluatorFactory[] frameValueExprEvals,
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalMemoryRequirements.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalMemoryRequirements.java
new file mode 100644
index 0000000..38e318c
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalMemoryRequirements.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.properties;
+
+public abstract class LocalMemoryRequirements {
+
+    public abstract int getMinMemoryBudgetInFrames();
+
+    public abstract int getMemoryBudgetInFrames();
+
+    public abstract void setMemoryBudgetInFrames(int value);
+
+    public final long getMemoryBudgetInBytes(long frameSize) {
+        return frameSize * getMemoryBudgetInFrames();
+    }
+
+    public static LocalMemoryRequirements fixedMemoryBudget(int 
memBudgetInFrames) {
+        if (memBudgetInFrames < 0) {
+            throw new 
IllegalArgumentException(String.valueOf(memBudgetInFrames));
+        }
+        return memBudgetInFrames == 
FixedMemoryBudget.ONE_FRAME.memBudgetInFrames ? FixedMemoryBudget.ONE_FRAME
+                : new FixedMemoryBudget(memBudgetInFrames);
+    }
+
+    private static final class FixedMemoryBudget extends 
LocalMemoryRequirements {
+
+        private static final FixedMemoryBudget ONE_FRAME = new 
FixedMemoryBudget(1);
+
+        private final int memBudgetInFrames;
+
+        private FixedMemoryBudget(int memBudgetInFrames) {
+            this.memBudgetInFrames = memBudgetInFrames;
+        }
+
+        @Override
+        public int getMinMemoryBudgetInFrames() {
+            return memBudgetInFrames;
+        }
+
+        @Override
+        public int getMemoryBudgetInFrames() {
+            return memBudgetInFrames;
+        }
+
+        @Override
+        public void setMemoryBudgetInFrames(int value) {
+            if (value != memBudgetInFrames) {
+                throw new IllegalArgumentException("Got " + value + ", 
expected " + memBudgetInFrames);
+            }
+        }
+    }
+
+    public static LocalMemoryRequirements variableMemoryBudget(int 
minMemBudgetInFrames) {
+        return new VariableMemoryBudget(minMemBudgetInFrames);
+    }
+
+    private static final class VariableMemoryBudget extends 
LocalMemoryRequirements {
+
+        private final int minMemBudgetInFrames;
+
+        private int memBudgetInFrames;
+
+        private VariableMemoryBudget(int minMemBudgetInFrames) {
+            this.memBudgetInFrames = this.minMemBudgetInFrames = 
minMemBudgetInFrames;
+        }
+
+        @Override
+        public int getMinMemoryBudgetInFrames() {
+            return minMemBudgetInFrames;
+        }
+
+        @Override
+        public int getMemoryBudgetInFrames() {
+            return memBudgetInFrames;
+        }
+
+        @Override
+        public void setMemoryBudgetInFrames(int value) {
+            if (value < minMemBudgetInFrames) {
+                throw new IllegalArgumentException("Got " + value + ", 
expected " + minMemBudgetInFrames + " or more");
+            }
+            memBudgetInFrames = value;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index a011abf..749a63c 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -100,7 +100,6 @@ import 
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
@@ -110,7 +109,6 @@ public class EnforceStructuralPropertiesRule implements 
IAlgebraicRewriteRule {
 
     private static final String HASH_MERGE = "hash_merge";
     private static final String TRUE_CONSTANT = "true";
-    private PhysicalOptimizationConfig physicalOptimizationConfig;
     private final FunctionIdentifier rangeMapFunction;
     private final FunctionIdentifier localSamplingFun;
     private final FunctionIdentifier typePropagatingFun;
@@ -147,7 +145,6 @@ public class EnforceStructuralPropertiesRule implements 
IAlgebraicRewriteRule {
         // These are actually logical constraints, so they could be 
pre-computed
         // somewhere else, too.
 
-        physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
         if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
             AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">>>> Optimizing operator 
" + op.getPhysicalOperator() + ".\n");
         }
@@ -213,7 +210,7 @@ public class EnforceStructuralPropertiesRule implements 
IAlgebraicRewriteRule {
         boolean loggerTraceEnabled = 
AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled();
 
         // The child index of the child operator to optimize first.
-        int startChildIndex = getStartChildIndex(op, pr, nestedPlan, context);
+        int startChildIndex = getStartChildIndex(op, pr, nestedPlan);
         IPartitioningProperty firstDeliveredPartitioning = null;
         // Enforce data properties in a top-down manner.
         for (j = 0; j < op.getInputs().size(); j++) {
@@ -328,8 +325,7 @@ public class EnforceStructuralPropertiesRule implements 
IAlgebraicRewriteRule {
     // Gets the index of a child to start top-down data property enforcement.
     // If there is a partitioning-compatible child with the operator in opRef,
     // start from this child; otherwise, start from child zero.
-    private int getStartChildIndex(AbstractLogicalOperator op, 
PhysicalRequirements pr, boolean nestedPlan,
-            IOptimizationContext context) throws AlgebricksException {
+    private int getStartChildIndex(AbstractLogicalOperator op, 
PhysicalRequirements pr, boolean nestedPlan) {
         IPhysicalPropertiesVector[] reqdProperties = null;
         if (pr != null) {
             reqdProperties = pr.getRequiredProperties();
@@ -468,7 +464,7 @@ public class EnforceStructuralPropertiesRule implements 
IAlgebraicRewriteRule {
         if (pp == null || pp.getPartitioningType() == 
PartitioningType.UNPARTITIONED) {
             addLocalEnforcers(op, childIndex, 
diffPropertiesVector.getLocalProperties(), nestedPlan, context);
             IPhysicalPropertiesVector deliveredByNewChild =
-                    ((AbstractLogicalOperator) 
op.getInputs().get(0).getValue()).getDeliveredPhysicalProperties();
+                    
op.getInputs().get(0).getValue().getDeliveredPhysicalProperties();
             if (!nestedPlan) {
                 addPartitioningEnforcers(op, childIndex, pp, required, 
deliveredByNewChild, domain, context);
             }
@@ -555,9 +551,9 @@ public class EnforceStructuralPropertiesRule implements 
IAlgebraicRewriteRule {
         oo.setSourceLocation(sourceLoc);
         oo.setExecutionMode(AbstractLogicalOperator.ExecutionMode.LOCAL);
         if (isMicroOp) {
-            oo.setPhysicalOperator(new 
MicroStableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
+            oo.setPhysicalOperator(new MicroStableSortPOperator());
         } else {
-            oo.setPhysicalOperator(new 
StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
+            oo.setPhysicalOperator(new StableSortPOperator());
         }
         oo.getInputs().add(topOp);
         context.computeAndSetTypeEnvironmentForOperator(oo);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/HybridToInMemoryHashJoinRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/HybridToInMemoryHashJoinRule.java
new file mode 100644
index 0000000..b1577bf
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/HybridToInMemoryHashJoinRule.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
+
+/**
+ * Must run after {@link SetMemoryRequirementsRule}
+ */
+public final class HybridToInMemoryHashJoinRule implements 
IAlgebraicRewriteRule {
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        if (op.getOperatorTag() == LogicalOperatorTag.INNERJOIN
+                || op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+            AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) 
op;
+            if (joinOp.getPhysicalOperator().getOperatorTag() == 
PhysicalOperatorTag.HYBRID_HASH_JOIN) {
+                return JoinUtils.hybridToInMemHashJoin(joinOp, context);
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) {
+        return false;
+    }
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
index 4c57f21..8a42c31 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
@@ -104,7 +104,6 @@ public class PushGroupByIntoSortRule implements 
IAlgebraicRewriteRule {
                         //replace preclustered gby with sort gby
                         if (!groupByOperator.isGroupAll()) {
                             op.setPhysicalOperator(new 
SortGroupByPOperator(groupByOperator.getGroupByVarList(),
-                                    
context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy(),
                                     sortPhysicalOperator.getSortColumns()));
                         }
                         // remove the stable sort operator
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index e6cdc28..bc853f0 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -40,6 +40,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
@@ -174,11 +175,11 @@ public class SetAlgebricksPhysicalOperatorsRule 
implements IAlgebraicRewriteRule
 
         protected final IOptimizationContext context;
 
-        protected final PhysicalOptimizationConfig physicalOptimizationConfig;
+        protected final PhysicalOptimizationConfig physConfig;
 
         protected 
AlgebricksPhysicalOperatorFactoryVisitor(IOptimizationContext context) {
             this.context = context;
-            this.physicalOptimizationConfig = 
context.getPhysicalOptimizationConfig();
+            this.physConfig = context.getPhysicalOptimizationConfig();
         }
 
         @Override
@@ -222,11 +223,9 @@ public class SetAlgebricksPhysicalOperatorsRule implements 
IAlgebraicRewriteRule
             }
 
             if (topLevelOp) {
-                return new 
PreclusteredGroupByPOperator(gby.getGroupByVarList(), gby.isGroupAll(),
-                        
context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy());
+                return new 
PreclusteredGroupByPOperator(gby.getGroupByVarList(), gby.isGroupAll());
             } else {
-                return new 
MicroPreclusteredGroupByPOperator(gby.getGroupByVarList(),
-                        
context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy());
+                return new 
MicroPreclusteredGroupByPOperator(gby.getGroupByVarList());
             }
         }
 
@@ -236,22 +235,27 @@ public class SetAlgebricksPhysicalOperatorsRule 
implements IAlgebraicRewriteRule
             if (!hasIntermediateAgg) {
                 return null;
             }
-            return new ExternalGroupByPOperator(gby.getGroupByVarList(),
-                    physicalOptimizationConfig.getMaxFramesForGroupBy(),
-                    (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
-                            * physicalOptimizationConfig.getFrameSize());
+            return new ExternalGroupByPOperator(gby.getGroupByVarList());
         }
 
         @Override
         public IPhysicalOperator visitInnerJoinOperator(InnerJoinOperator op, 
Boolean topLevelOp)
                 throws AlgebricksException {
-            JoinUtils.setJoinAlgorithmAndExchangeAlgo(op, topLevelOp, context);
-            return op.getPhysicalOperator();
+            return visitAbstractBinaryJoinOperator(op, topLevelOp);
         }
 
         @Override
         public IPhysicalOperator 
visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Boolean topLevelOp)
                 throws AlgebricksException {
+            return visitAbstractBinaryJoinOperator(op, topLevelOp);
+        }
+
+        protected IPhysicalOperator 
visitAbstractBinaryJoinOperator(AbstractBinaryJoinOperator op, Boolean 
topLevelOp)
+                throws AlgebricksException {
+            if (!topLevelOp) {
+                throw 
AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED, 
op.getSourceLocation(),
+                        op.getOperatorTag().toString() + " (micro)");
+            }
             JoinUtils.setJoinAlgorithmAndExchangeAlgo(op, topLevelOp, context);
             return op.getPhysicalOperator();
         }
@@ -270,9 +274,9 @@ public class SetAlgebricksPhysicalOperatorsRule implements 
IAlgebraicRewriteRule
         public IPhysicalOperator visitOrderOperator(OrderOperator oo, Boolean 
topLevelOp) throws AlgebricksException {
             ensureAllVariables(oo.getOrderExpressions(), Pair::getSecond);
             if (topLevelOp) {
-                return new 
StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort(), 
oo.getTopK());
+                return new StableSortPOperator(oo.getTopK());
             } else {
-                return new 
MicroStableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort());
+                return new MicroStableSortPOperator();
             }
         }
 
@@ -470,8 +474,7 @@ public class SetAlgebricksPhysicalOperatorsRule implements 
IAlgebraicRewriteRule
         }
 
         protected AbstractWindowPOperator createWindowPOperator(WindowOperator 
op) throws AlgebricksException {
-            return new WindowPOperator(op.getPartitionVarList(), 
op.getOrderColumnList(), false, false, false,
-                    
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+            return new WindowPOperator(op.getPartitionVarList(), 
op.getOrderColumnList(), false, false, false);
         }
 
         // Physical operators for these operators must have been set already 
by rules that introduced them
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
new file mode 100644
index 0000000..4cecb07
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
+import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
+/**
+ * Set memory requirements for all operators as follows:
+ * <ol>
+ * <li>First call {@link 
IPhysicalOperator#createLocalMemoryRequirements(ILogicalOperator)}
+ *     to initialize each operator's {@link LocalMemoryRequirements} with 
minimal memory budget required by
+ *     that operator</li>
+ * <li>Then increase memory requirements for certain operators as specified by 
{@link PhysicalOptimizationConfig}</li>
+ * </ol>
+ */
+public class SetMemoryRequirementsRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
+        IPhysicalOperator physOp = op.getPhysicalOperator();
+        if (physOp.getLocalMemoryRequirements() != null) {
+            return false;
+        }
+        computeLocalMemoryRequirements(op, context, 
createMemoryRequirementsConfigurator(context));
+        return true;
+    }
+
+    private void computeLocalMemoryRequirements(AbstractLogicalOperator op, 
IOptimizationContext context,
+            ILogicalOperatorVisitor<Void, Void> memoryRequirementsVisitor) 
throws AlgebricksException {
+        IPhysicalOperator physOp = op.getPhysicalOperator();
+        if (physOp.getLocalMemoryRequirements() == null) {
+            physOp.createLocalMemoryRequirements(op);
+            if (physOp.getLocalMemoryRequirements() == null) {
+                throw new 
IllegalStateException(physOp.getOperatorTag().toString());
+            }
+            op.accept(memoryRequirementsVisitor, null);
+        }
+        if (op.hasNestedPlans()) {
+            AbstractOperatorWithNestedPlans nested = 
(AbstractOperatorWithNestedPlans) op;
+            for (ILogicalPlan p : nested.getNestedPlans()) {
+                for (Mutable<ILogicalOperator> root : p.getRoots()) {
+                    computeLocalMemoryRequirements((AbstractLogicalOperator) 
root.getValue(), context,
+                            createMemoryRequirementsConfigurator(context));
+                }
+            }
+        }
+        for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
+            computeLocalMemoryRequirements((AbstractLogicalOperator) 
opRef.getValue(), context,
+                    createMemoryRequirementsConfigurator(context));
+        }
+    }
+
+    protected ILogicalOperatorVisitor<Void, Void> 
createMemoryRequirementsConfigurator(IOptimizationContext context) {
+        return new MemoryRequirementsConfigurator(context);
+    }
+
+    protected static class MemoryRequirementsConfigurator implements 
ILogicalOperatorVisitor<Void, Void> {
+
+        protected final IOptimizationContext context;
+
+        protected final PhysicalOptimizationConfig physConfig;
+
+        protected MemoryRequirementsConfigurator(IOptimizationContext context) 
{
+            this.context = context;
+            this.physConfig = context.getPhysicalOptimizationConfig();
+        }
+
+        // helper methods
+
+        protected void setOperatorMemoryBudget(AbstractLogicalOperator op, int 
memBudgetInFrames)
+                throws AlgebricksException {
+            LocalMemoryRequirements memoryReqs = 
op.getPhysicalOperator().getLocalMemoryRequirements();
+            int minBudgetInFrames = memoryReqs.getMinMemoryBudgetInFrames();
+            if (memBudgetInFrames < minBudgetInFrames) {
+                throw 
AlgebricksException.create(ErrorCode.ILLEGAL_MEMORY_BUDGET, 
op.getSourceLocation(),
+                        op.getOperatorTag().toString(), memBudgetInFrames * 
physConfig.getFrameSize(),
+                        minBudgetInFrames * physConfig.getFrameSize());
+            }
+            memoryReqs.setMemoryBudgetInFrames(memBudgetInFrames);
+        }
+
+        // variable memory operators
+
+        @Override
+        public Void visitOrderOperator(OrderOperator op, Void arg) throws 
AlgebricksException {
+            setOperatorMemoryBudget(op, physConfig.getMaxFramesExternalSort());
+            return null;
+        }
+
+        @Override
+        public Void visitGroupByOperator(GroupByOperator op, Void arg) throws 
AlgebricksException {
+            setOperatorMemoryBudget(op, physConfig.getMaxFramesForGroupBy());
+            return null;
+        }
+
+        @Override
+        public Void visitWindowOperator(WindowOperator op, Void arg) throws 
AlgebricksException {
+            if (op.getPhysicalOperator().getOperatorTag() == 
PhysicalOperatorTag.WINDOW) {
+                setOperatorMemoryBudget(op, 
physConfig.getMaxFramesForWindow());
+            }
+            return null;
+        }
+
+        @Override
+        public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) 
throws AlgebricksException {
+            return visitJoinOperator(op, arg);
+        }
+
+        @Override
+        public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void 
arg) throws AlgebricksException {
+            return visitJoinOperator(op, arg);
+        }
+
+        protected Void visitJoinOperator(AbstractBinaryJoinOperator op, Void 
arg) throws AlgebricksException {
+            setOperatorMemoryBudget(op, physConfig.getMaxFramesForJoin());
+            return null;
+        }
+
+        @Override
+        public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) 
throws AlgebricksException {
+            return visitAbstractUnnestMapOperator(op, arg);
+        }
+
+        @Override
+        public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator 
op, Void arg)
+                throws AlgebricksException {
+            return visitAbstractUnnestMapOperator(op, arg);
+        }
+
+        protected Void 
visitAbstractUnnestMapOperator(AbstractUnnestMapOperator op, Void arg)
+                throws AlgebricksException {
+            IPhysicalOperator physOp = op.getPhysicalOperator();
+            if (physOp.getOperatorTag() == 
PhysicalOperatorTag.LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH
+                    || physOp.getOperatorTag() == 
PhysicalOperatorTag.SINGLE_PARTITION_INVERTED_INDEX_SEARCH) {
+                setOperatorMemoryBudget(op, 
physConfig.getMaxFramesForTextSearch());
+            }
+            return null;
+        }
+
+        // fixed memory operators
+
+        @Override
+        public Void visitAggregateOperator(AggregateOperator op, Void arg) 
throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitRunningAggregateOperator(RunningAggregateOperator op, 
Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, 
Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitLimitOperator(LimitOperator op, Void arg) throws 
AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator 
op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitAssignOperator(AssignOperator op, Void arg) throws 
AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitSelectOperator(SelectOperator op, Void arg) throws 
AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitDelegateOperator(DelegateOperator op, Void arg) 
throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitProjectOperator(ProjectOperator op, Void arg) throws 
AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitReplicateOperator(ReplicateOperator op, Void arg) 
throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitSplitOperator(SplitOperator op, Void arg) throws 
AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitMaterializeOperator(MaterializeOperator op, Void arg) 
throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitScriptOperator(ScriptOperator op, Void arg) throws 
AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitSubplanOperator(SubplanOperator op, Void arg) throws 
AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitSinkOperator(SinkOperator op, Void arg) throws 
AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitUnionOperator(UnionAllOperator op, Void arg) throws 
AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitIntersectOperator(IntersectOperator op, Void arg) 
throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitUnnestOperator(UnnestOperator op, Void arg) throws 
AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, 
Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) 
throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitDistinctOperator(DistinctOperator op, Void arg) 
throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitExchangeOperator(ExchangeOperator op, Void arg) 
throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitWriteOperator(WriteOperator op, Void arg) throws 
AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitDistributeResultOperator(DistributeResultOperator op, 
Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitWriteResultOperator(WriteResultOperator op, Void arg) 
throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator 
op, Void arg)
+                throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void 
visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void 
arg)
+                throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitTokenizeOperator(TokenizeOperator op, Void arg) 
throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitForwardOperator(ForwardOperator op, Void arg) throws 
AlgebricksException {
+            return null;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 3042548..6687027 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -52,7 +52,7 @@ public class JoinUtils {
     }
 
     public static void 
setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, boolean 
topLevelOp,
-            IOptimizationContext context) throws AlgebricksException {
+            IOptimizationContext context) {
         if (!topLevelOp) {
             throw new IllegalStateException("Micro operator not implemented 
for: " + op.getOperatorTag());
         }
@@ -87,30 +87,28 @@ public class JoinUtils {
                 }
             }
         } else {
-            setNestedLoopJoinOp(op, context);
+            setNestedLoopJoinOp(op);
         }
     }
 
-    private static void setNestedLoopJoinOp(AbstractBinaryJoinOperator op, 
IOptimizationContext context) {
-        op.setPhysicalOperator(new NestedLoopJoinPOperator(op.getJoinKind(), 
JoinPartitioningType.BROADCAST,
-                
context.getPhysicalOptimizationConfig().getMaxFramesForJoin()));
+    private static void setNestedLoopJoinOp(AbstractBinaryJoinOperator op) {
+        op.setPhysicalOperator(new NestedLoopJoinPOperator(op.getJoinKind(), 
JoinPartitioningType.BROADCAST));
     }
 
     private static void setHashJoinOp(AbstractBinaryJoinOperator op, 
JoinPartitioningType partitioningType,
-            List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, 
IOptimizationContext context)
-            throws AlgebricksException {
+            List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, 
IOptimizationContext context) {
         op.setPhysicalOperator(new HybridHashJoinPOperator(op.getJoinKind(), 
partitioningType, sideLeft, sideRight,
-                context.getPhysicalOptimizationConfig().getMaxFramesForJoin(),
                 
context.getPhysicalOptimizationConfig().getMaxFramesForJoinLeftInput(),
                 
context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame(),
                 context.getPhysicalOptimizationConfig().getFudgeFactor()));
-        if (partitioningType == JoinPartitioningType.BROADCAST) {
-            hybridToInMemHashJoin(op, context);
-        }
     }
 
-    private static void hybridToInMemHashJoin(AbstractBinaryJoinOperator op, 
IOptimizationContext context)
+    public static boolean hybridToInMemHashJoin(AbstractBinaryJoinOperator op, 
IOptimizationContext context)
             throws AlgebricksException {
+        HybridHashJoinPOperator hhj = (HybridHashJoinPOperator) 
op.getPhysicalOperator();
+        if (hhj.getPartitioningType() != JoinPartitioningType.BROADCAST) {
+            return false;
+        }
         ILogicalOperator opBuild = op.getInputs().get(1).getValue();
         LogicalPropertiesVisitor.computeLogicalPropertiesDFS(opBuild, context);
         ILogicalPropertiesVector v = 
context.getLogicalPropertiesVector(opBuild);
@@ -121,19 +119,19 @@ public class JoinUtils {
         }
         if (v != null) {
             int size2 = v.getMaxOutputFrames();
-            HybridHashJoinPOperator hhj = (HybridHashJoinPOperator) 
op.getPhysicalOperator();
-            if (size2 > 0 && size2 * hhj.getFudgeFactor() <= 
hhj.getMemSizeInFrames()) {
+            int hhjMemSizeInFrames = 
hhj.getLocalMemoryRequirements().getMemoryBudgetInFrames();
+            if (size2 > 0 && size2 * hhj.getFudgeFactor() <= 
hhjMemSizeInFrames) {
                 if (loggerTraceEnabled) {
                     AlgebricksConfig.ALGEBRICKS_LOGGER
                             .trace("// HybridHashJoin inner branch " + 
opBuild.getOperatorTag() + " fits in memory\n");
                 }
                 // maintains the local properties on the probe side
-                op.setPhysicalOperator(
-                        new InMemoryHashJoinPOperator(hhj.getKind(), 
hhj.getPartitioningType(), hhj.getKeysLeftBranch(),
-                                hhj.getKeysRightBranch(), 
v.getNumberOfTuples() * 2, hhj.getMemSizeInFrames()));
+                op.setPhysicalOperator(new 
InMemoryHashJoinPOperator(hhj.getKind(), hhj.getPartitioningType(),
+                        hhj.getKeysLeftBranch(), hhj.getKeysRightBranch(), 
v.getNumberOfTuples() * 2));
+                return true;
             }
         }
-
+        return false;
     }
 
     private static boolean isHashJoinCondition(ILogicalExpression e, 
Collection<LogicalVariable> inLeftAll,

Reply via email to