This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 76badfbf42 [NO ISSUE][RT]: Refactor: Remove code duplication in stable 
sort operators
76badfbf42 is described below

commit 76badfbf421026080571c60ed258e6879eba94ba
Author: Janhavi Tripurwar <[email protected]>
AuthorDate: Tue Jul 8 19:00:42 2025 +0530

    [NO ISSUE][RT]: Refactor: Remove code duplication in stable sort operators
    
    Change-Id: I2a0fc093d356be7e2379a66c6843ebd6b6cdbd4c
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20035
    Reviewed-by: Ali Alsuliman <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Janhavi Tripurwar <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../physical/AbstractStableSortPOperator.java      | 60 ++++++++++++++++++++++
 .../physical/MicroStableSortPOperator.java         | 39 ++------------
 .../operators/physical/StableSortPOperator.java    | 44 ++--------------
 3 files changed, 69 insertions(+), 74 deletions(-)

diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index 3162c1c223..840a33e66e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -32,8 +33,10 @@ import 
org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
@@ -47,7 +50,14 @@ import 
org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import 
org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
 
 public abstract class AbstractStableSortPOperator extends 
AbstractPhysicalOperator {
@@ -127,6 +137,56 @@ public abstract class AbstractStableSortPOperator extends 
AbstractPhysicalOperat
         return orderProp;
     }
 
+    protected static class SortSetupData {
+        protected final int[] sortFields;
+        protected final IBinaryComparatorFactory[] comps;
+        protected final INormalizedKeyComputerFactory nkcf;
+        protected final RecordDescriptor recDescriptor;
+        protected final int maxNumberOfFrames;
+
+        protected SortSetupData(int[] sortFields, IBinaryComparatorFactory[] 
comps, INormalizedKeyComputerFactory nkcf,
+                RecordDescriptor recDescriptor, int maxNumberOfFrames) {
+            this.sortFields = sortFields;
+            this.comps = comps;
+            this.nkcf = nkcf;
+            this.recDescriptor = recDescriptor;
+            this.maxNumberOfFrames = maxNumberOfFrames;
+        }
+    }
+
+    protected static SortSetupData setupSortOperator(JobGenContext context, 
ILogicalOperator op,
+            IOperatorSchema opSchema, OrderColumn[] sortColumns, 
LocalMemoryRequirements memReq)
+            throws AlgebricksException {
+
+        RecordDescriptor recDescriptor =
+                
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
+        int n = sortColumns.length;
+        int[] sortFields = new int[n];
+        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+        INormalizedKeyComputerFactoryProvider nkcfProvider = 
context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+
+        for (int i = 0; i < n; i++) {
+            OrderColumn oc = sortColumns[i];
+            LogicalVariable var = oc.getColumn();
+            sortFields[i] = opSchema.findVariable(var);
+            Object type = env.getVarType(var);
+            OrderOperator.IOrder.OrderKind order = oc.getOrder();
+
+            if (i == 0 && nkcfProvider != null && type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, 
order == OrderOperator.IOrder.OrderKind.ASC);
+            }
+
+            IBinaryComparatorFactoryProvider bcfp = 
context.getBinaryComparatorFactoryProvider();
+            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == 
OrderOperator.IOrder.OrderKind.ASC);
+        }
+
+        int maxNumberOfFrames = memReq.getMemoryBudgetInFrames();
+        return new SortSetupData(sortFields, comps, nkcf, recDescriptor, 
maxNumberOfFrames);
+    }
+
     @Override
     public String toString() {
         if (orderProp == null) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
index 403ca576f4..fede5061c9 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
@@ -21,21 +21,11 @@ package 
org.apache.hyracks.algebricks.core.algebra.operators.physical;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import 
org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.sort.MicroSortRuntimeFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 
 public class MicroStableSortPOperator extends AbstractStableSortPOperator {
 
@@ -56,32 +46,11 @@ public class MicroStableSortPOperator extends 
AbstractStableSortPOperator {
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
-        RecordDescriptor recDescriptor =
-                
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
-        int n = sortColumns.length;
-        int[] sortFields = new int[n];
-        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
-        int i = 0;
-        INormalizedKeyComputerFactoryProvider nkcfProvider = 
context.getNormalizedKeyComputerFactoryProvider();
-        INormalizedKeyComputerFactory nkcf = null;
-        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
-        for (OrderColumn oc : sortColumns) {
-            LogicalVariable var = oc.getColumn();
-            sortFields[i] = opSchema.findVariable(var);
-            Object type = env.getVarType(var);
-            OrderKind order = oc.getOrder();
-            if (i == 0 && nkcfProvider != null && type != null) {
-                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, 
order == OrderKind.ASC);
-            }
+        SortSetupData sortSetupData = setupSortOperator(context, op, opSchema, 
sortColumns, localMemoryRequirements);
 
-            IBinaryComparatorFactoryProvider bcfp = 
context.getBinaryComparatorFactoryProvider();
-            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == 
OrderKind.ASC);
-            i++;
-        }
-
-        int maxNumberOfFrames = 
localMemoryRequirements.getMemoryBudgetInFrames();
-        IPushRuntimeFactory runtime = new MicroSortRuntimeFactory(sortFields, 
nkcf, comps, null, maxNumberOfFrames);
-        builder.contributeMicroOperator(op, runtime, recDescriptor);
+        IPushRuntimeFactory runtime = new 
MicroSortRuntimeFactory(sortSetupData.sortFields, sortSetupData.nkcf,
+                sortSetupData.comps, null, sortSetupData.maxNumberOfFrames);
+        builder.contributeMicroOperator(op, runtime, 
sortSetupData.recDescriptor);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
index 93c5c3b15f..729ec3f972 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
@@ -21,21 +21,11 @@ package 
org.apache.hyracks.algebricks.core.algebra.operators.physical;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import 
org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
@@ -72,42 +62,18 @@ public class StableSortPOperator extends 
AbstractStableSortPOperator {
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        RecordDescriptor recDescriptor =
-                
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
-        int n = sortColumns.length;
-        int[] sortFields = new int[n];
-        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+        SortSetupData sortSetupData = setupSortOperator(context, op, opSchema, 
sortColumns, localMemoryRequirements);
 
-        INormalizedKeyComputerFactoryProvider nkcfProvider = 
context.getNormalizedKeyComputerFactoryProvider();
-        INormalizedKeyComputerFactory nkcf = null;
-
-        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
-        int i = 0;
-        // TODO(ali): should refactor common code with micro sort op
-        for (OrderColumn oc : sortColumns) {
-            LogicalVariable var = oc.getColumn();
-            sortFields[i] = opSchema.findVariable(var);
-            Object type = env.getVarType(var);
-            OrderKind order = oc.getOrder();
-            if (i == 0 && nkcfProvider != null && type != null) {
-                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, 
order == OrderKind.ASC);
-            }
-            IBinaryComparatorFactoryProvider bcfp = 
context.getBinaryComparatorFactoryProvider();
-            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == 
OrderKind.ASC);
-            i++;
-        }
-
-        int maxNumberOfFrames = 
localMemoryRequirements.getMemoryBudgetInFrames();
         AbstractSorterOperatorDescriptor sortOpDesc;
         // topK == -1 means that a topK value is not provided.
         if (topK == -1) {
-            sortOpDesc =
-                    new ExternalSortOperatorDescriptor(spec, 
maxNumberOfFrames, sortFields, nkcf, comps, recDescriptor);
+            sortOpDesc = new ExternalSortOperatorDescriptor(spec, 
sortSetupData.maxNumberOfFrames,
+                    sortSetupData.sortFields, sortSetupData.nkcf, 
sortSetupData.comps, sortSetupData.recDescriptor);
         } else {
             // Since topK value is provided, topK optimization is possible.
             // We call topKSorter instead of calling ExternalSortOperator.
-            sortOpDesc = new TopKSorterOperatorDescriptor(spec, 
maxNumberOfFrames, topK, sortFields, nkcf, comps,
-                    recDescriptor);
+            sortOpDesc = new TopKSorterOperatorDescriptor(spec, 
sortSetupData.maxNumberOfFrames, topK,
+                    sortSetupData.sortFields, sortSetupData.nkcf, 
sortSetupData.comps, sortSetupData.recDescriptor);
         }
         sortOpDesc.setSourceLocation(op.getSourceLocation());
         contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc);

Reply via email to