This is an automated email from the ASF dual-hosted git repository.
mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 76badfbf42 [NO ISSUE][RT]: Refactor: Remove code duplication in stable
sort operators
76badfbf42 is described below
commit 76badfbf421026080571c60ed258e6879eba94ba
Author: Janhavi Tripurwar <[email protected]>
AuthorDate: Tue Jul 8 19:00:42 2025 +0530
[NO ISSUE][RT]: Refactor: Remove code duplication in stable sort operators
Change-Id: I2a0fc093d356be7e2379a66c6843ebd6b6cdbd4c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20035
Reviewed-by: Ali Alsuliman <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Janhavi Tripurwar <[email protected]>
Tested-by: Jenkins <[email protected]>
---
.../physical/AbstractStableSortPOperator.java | 60 ++++++++++++++++++++++
.../physical/MicroStableSortPOperator.java | 39 ++------------
.../operators/physical/StableSortPOperator.java | 44 ++--------------
3 files changed, 69 insertions(+), 74 deletions(-)
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index 3162c1c223..840a33e66e 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -32,8 +33,10 @@ import
org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
@@ -47,7 +50,14 @@ import
org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import
org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
import
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import
org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
public abstract class AbstractStableSortPOperator extends
AbstractPhysicalOperator {
@@ -127,6 +137,56 @@ public abstract class AbstractStableSortPOperator extends
AbstractPhysicalOperat
return orderProp;
}
+ protected static class SortSetupData {
+ protected final int[] sortFields;
+ protected final IBinaryComparatorFactory[] comps;
+ protected final INormalizedKeyComputerFactory nkcf;
+ protected final RecordDescriptor recDescriptor;
+ protected final int maxNumberOfFrames;
+
+ protected SortSetupData(int[] sortFields, IBinaryComparatorFactory[]
comps, INormalizedKeyComputerFactory nkcf,
+ RecordDescriptor recDescriptor, int maxNumberOfFrames) {
+ this.sortFields = sortFields;
+ this.comps = comps;
+ this.nkcf = nkcf;
+ this.recDescriptor = recDescriptor;
+ this.maxNumberOfFrames = maxNumberOfFrames;
+ }
+ }
+
+ protected static SortSetupData setupSortOperator(JobGenContext context,
ILogicalOperator op,
+ IOperatorSchema opSchema, OrderColumn[] sortColumns,
LocalMemoryRequirements memReq)
+ throws AlgebricksException {
+
+ RecordDescriptor recDescriptor =
+
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
context);
+ int n = sortColumns.length;
+ int[] sortFields = new int[n];
+ IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+ INormalizedKeyComputerFactoryProvider nkcfProvider =
context.getNormalizedKeyComputerFactoryProvider();
+ INormalizedKeyComputerFactory nkcf = null;
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+
+ for (int i = 0; i < n; i++) {
+ OrderColumn oc = sortColumns[i];
+ LogicalVariable var = oc.getColumn();
+ sortFields[i] = opSchema.findVariable(var);
+ Object type = env.getVarType(var);
+ OrderOperator.IOrder.OrderKind order = oc.getOrder();
+
+ if (i == 0 && nkcfProvider != null && type != null) {
+ nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type,
order == OrderOperator.IOrder.OrderKind.ASC);
+ }
+
+ IBinaryComparatorFactoryProvider bcfp =
context.getBinaryComparatorFactoryProvider();
+ comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() ==
OrderOperator.IOrder.OrderKind.ASC);
+ }
+
+ int maxNumberOfFrames = memReq.getMemoryBudgetInFrames();
+ return new SortSetupData(sortFields, comps, nkcf, recDescriptor,
maxNumberOfFrames);
+ }
+
@Override
public String toString() {
if (orderProp == null) {
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
index 403ca576f4..fede5061c9 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
@@ -21,21 +21,11 @@ package
org.apache.hyracks.algebricks.core.algebra.operators.physical;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import
org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import
org.apache.hyracks.algebricks.runtime.operators.sort.MicroSortRuntimeFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
public class MicroStableSortPOperator extends AbstractStableSortPOperator {
@@ -56,32 +46,11 @@ public class MicroStableSortPOperator extends
AbstractStableSortPOperator {
public void contributeRuntimeOperator(IHyracksJobBuilder builder,
JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
throws AlgebricksException {
- RecordDescriptor recDescriptor =
-
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
context);
- int n = sortColumns.length;
- int[] sortFields = new int[n];
- IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
- int i = 0;
- INormalizedKeyComputerFactoryProvider nkcfProvider =
context.getNormalizedKeyComputerFactoryProvider();
- INormalizedKeyComputerFactory nkcf = null;
- IVariableTypeEnvironment env = context.getTypeEnvironment(op);
- for (OrderColumn oc : sortColumns) {
- LogicalVariable var = oc.getColumn();
- sortFields[i] = opSchema.findVariable(var);
- Object type = env.getVarType(var);
- OrderKind order = oc.getOrder();
- if (i == 0 && nkcfProvider != null && type != null) {
- nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type,
order == OrderKind.ASC);
- }
+ SortSetupData sortSetupData = setupSortOperator(context, op, opSchema,
sortColumns, localMemoryRequirements);
- IBinaryComparatorFactoryProvider bcfp =
context.getBinaryComparatorFactoryProvider();
- comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() ==
OrderKind.ASC);
- i++;
- }
-
- int maxNumberOfFrames =
localMemoryRequirements.getMemoryBudgetInFrames();
- IPushRuntimeFactory runtime = new MicroSortRuntimeFactory(sortFields,
nkcf, comps, null, maxNumberOfFrames);
- builder.contributeMicroOperator(op, runtime, recDescriptor);
+ IPushRuntimeFactory runtime = new
MicroSortRuntimeFactory(sortSetupData.sortFields, sortSetupData.nkcf,
+ sortSetupData.comps, null, sortSetupData.maxNumberOfFrames);
+ builder.contributeMicroOperator(op, runtime,
sortSetupData.recDescriptor);
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
index 93c5c3b15f..729ec3f972 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
@@ -21,21 +21,11 @@ package
org.apache.hyracks.algebricks.core.algebra.operators.physical;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import
org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
@@ -72,42 +62,18 @@ public class StableSortPOperator extends
AbstractStableSortPOperator {
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
throws AlgebricksException {
IOperatorDescriptorRegistry spec = builder.getJobSpec();
- RecordDescriptor recDescriptor =
-
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
context);
- int n = sortColumns.length;
- int[] sortFields = new int[n];
- IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+ SortSetupData sortSetupData = setupSortOperator(context, op, opSchema,
sortColumns, localMemoryRequirements);
- INormalizedKeyComputerFactoryProvider nkcfProvider =
context.getNormalizedKeyComputerFactoryProvider();
- INormalizedKeyComputerFactory nkcf = null;
-
- IVariableTypeEnvironment env = context.getTypeEnvironment(op);
- int i = 0;
- // TODO(ali): should refactor common code with micro sort op
- for (OrderColumn oc : sortColumns) {
- LogicalVariable var = oc.getColumn();
- sortFields[i] = opSchema.findVariable(var);
- Object type = env.getVarType(var);
- OrderKind order = oc.getOrder();
- if (i == 0 && nkcfProvider != null && type != null) {
- nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type,
order == OrderKind.ASC);
- }
- IBinaryComparatorFactoryProvider bcfp =
context.getBinaryComparatorFactoryProvider();
- comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() ==
OrderKind.ASC);
- i++;
- }
-
- int maxNumberOfFrames =
localMemoryRequirements.getMemoryBudgetInFrames();
AbstractSorterOperatorDescriptor sortOpDesc;
// topK == -1 means that a topK value is not provided.
if (topK == -1) {
- sortOpDesc =
- new ExternalSortOperatorDescriptor(spec,
maxNumberOfFrames, sortFields, nkcf, comps, recDescriptor);
+ sortOpDesc = new ExternalSortOperatorDescriptor(spec,
sortSetupData.maxNumberOfFrames,
+ sortSetupData.sortFields, sortSetupData.nkcf,
sortSetupData.comps, sortSetupData.recDescriptor);
} else {
// Since topK value is provided, topK optimization is possible.
// We call topKSorter instead of calling ExternalSortOperator.
- sortOpDesc = new TopKSorterOperatorDescriptor(spec,
maxNumberOfFrames, topK, sortFields, nkcf, comps,
- recDescriptor);
+ sortOpDesc = new TopKSorterOperatorDescriptor(spec,
sortSetupData.maxNumberOfFrames, topK,
+ sortSetupData.sortFields, sortSetupData.nkcf,
sortSetupData.comps, sortSetupData.recDescriptor);
}
sortOpDesc.setSourceLocation(op.getSourceLocation());
contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc);