This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 19cf53341ea35ed3731cacb51208d22c0528b385 Author: Ali Alsuliman <[email protected]> AuthorDate: Fri Mar 29 00:43:15 2024 +0300 [ASTERIXDB-3369][FUN] Implement SQL median() - user model changes: no - storage format changes: no - interface changes: yes Details: Implement SQL median() function. Each local partition will generate a sorted run file. Those run files can be used to determine the median. - add ability to generate unique ids at the joblet level in IHyracksJobletContext. - median() will use the unique ids for file ids when generating local run files. - references to the generated local run files are kept in JobFileState at Joblet level. - use NetworkManager to receive file reading requests. - make PartitionManager keep track of file requests. - currently, median() will use the same sort memory configured for the cluster. - pass the sort memory to the median() via the type inferer. - account for the memory required by median() when calculating the required capacity of a query. - median() used as a WINDOW function does not support ORDER BY and FRAME. - refactor MaterializedPartition to PartitionFileReader. Change-Id: Id8d03f42d54b5ed4cf316c31f0b4cce9dd7c1dc0 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18210 Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- .../SqlppExpressionToPlanTranslator.java | 12 +- .../apache/asterix/api/common/APIFramework.java | 2 +- .../app/resource/OperatorResourcesComputer.java | 71 +++- .../org/apache/asterix/utils/ResourceUtils.java | 25 +- .../app/resource/PlanStagesGeneratorTest.java | 2 +- .../asterix/runtime/PartitionManagerTest.java | 2 +- .../asterix/common/config/CompilerProperties.java | 5 + .../common/config/OptimizationConfUtil.java | 2 +- .../asterix/om/functions/BuiltinFunctions.java | 53 +++ .../typecomputer/impl/LocalMedianTypeComputer.java | 44 +++ .../test/om/typecomputer/TypeComputerTest.java | 2 + .../scalar/ScalarSqlMedianAggregateDescriptor.java | 60 ++++ .../std/AbstractLocalMedianAggregateFunction.java | 176 ++++++++++ .../std/AbstractMedianAggregateFunction.java | 366 +++++++++++++++++++++ .../std/GlobalSqlMedianAggregateDescriptor.java | 54 +++ .../std/GlobalSqlMedianAggregateFunction.java | 51 +++ .../IntermediateSqlMedianAggregateDescriptor.java | 54 +++ .../IntermediateSqlMedianAggregateFunction.java | 50 +++ .../std/LocalSqlMedianAggregateDescriptor.java | 75 +++++ .../std/LocalSqlMedianAggregateFunction.java | 50 +++ .../std/SqlMedianAggregateDescriptor.java | 77 +++++ .../aggregates/std/SqlMedianAggregateFunction.java | 85 +++++ .../runtime/functions/FunctionCollection.java | 10 + .../runtime/functions/FunctionTypeInferers.java | 3 + .../hyracks/api/context/IHyracksJobletContext.java | 2 + ...utChannel.java => FileNetworkInputChannel.java} | 57 ++-- .../hyracks/comm/channels/NetworkInputChannel.java | 2 +- .../java/org/apache/hyracks/control/nc/Joblet.java | 7 + .../hyracks/control/nc/net/NetworkManager.java | 45 ++- .../control/nc/partitions/JobFileState.java | 92 ++++++ .../nc/partitions/MaterializedPartition.java | 43 +-- .../control/nc/partitions/PartitionFileReader.java | 82 +++++ .../nc/partitions/PartitionFileReaderUtil.java | 58 ++++ .../control/nc/partitions/PartitionManager.java | 23 ++ .../hyracks/dataflow/common/io/RunFileReader.java | 4 + .../hyracks/test/support/TestJobletContext.java | 7 + 36 files changed, 1648 insertions(+), 105 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java index 1d9c57bbf3..037aca3cf1 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java @@ -1219,10 +1219,14 @@ public class SqlppExpressionToPlanTranslator extends LangExpressionToPlanTransla boolean isWin = BuiltinFunctions.isWindowFunction(fi); boolean isWinAgg = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi, BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG); - boolean prohibitOrderClause = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi, - BuiltinFunctions.WindowFunctionProperty.NO_ORDER_CLAUSE); - boolean prohibitFrameClause = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi, - BuiltinFunctions.WindowFunctionProperty.NO_FRAME_CLAUSE); + boolean prohibitOrderClause = (isWin && BuiltinFunctions.builtinFunctionHasProperty(fi, + BuiltinFunctions.WindowFunctionProperty.NO_ORDER_CLAUSE)) + || (!isWin && BuiltinFunctions.builtinFunctionHasProperty(fi, + BuiltinFunctions.AggregateFunctionProperty.NO_ORDER_CLAUSE)); + boolean prohibitFrameClause = (isWin && BuiltinFunctions.builtinFunctionHasProperty(fi, + BuiltinFunctions.WindowFunctionProperty.NO_FRAME_CLAUSE)) + || (!isWin && BuiltinFunctions.builtinFunctionHasProperty(fi, + BuiltinFunctions.AggregateFunctionProperty.NO_FRAME_CLAUSE)); boolean allowRespectIgnoreNulls = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi, BuiltinFunctions.WindowFunctionProperty.ALLOW_RESPECT_IGNORE_NULLS); boolean allowFromFirstLast = isWin && BuiltinFunctions.builtinFunctionHasProperty(fi, diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 99678dabc4..cb86e3589e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -332,7 +332,7 @@ public class APIFramework { final AlgebricksAbsolutePartitionConstraint jobLocations = getJobLocations(spec, nodeJobTracker, computationLocations); final IClusterCapacity jobRequiredCapacity = - ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf); + ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf, compilerProperties); spec.setRequiredClusterCapacity(jobRequiredCapacity); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java index a16ac84426..9e779229d7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java @@ -18,25 +18,42 @@ */ package org.apache.asterix.app.resource; +import org.apache.asterix.common.config.CompilerProperties; +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class OperatorResourcesComputer { + private static final Logger LOGGER = LogManager.getLogger(); public static final int MIN_OPERATOR_CORES = 1; private static final long MAX_BUFFER_PER_CONNECTION = 1L; private final int numComputationPartitions; private final long frameSize; + private final ExpressionMemoryComputer exprMemoryComputer; + private final CompilerProperties compilerProperties; - public OperatorResourcesComputer(int numComputationPartitions, long frameSize) { + public OperatorResourcesComputer(int numComputationPartitions, long frameSize, + CompilerProperties compilerProperties) { this.numComputationPartitions = numComputationPartitions; this.frameSize = frameSize; + this.exprMemoryComputer = new ExpressionMemoryComputer(); + this.compilerProperties = compilerProperties; } public int getOperatorRequiredCores(ILogicalOperator operator) { @@ -52,10 +69,22 @@ public class OperatorResourcesComputer { return getExchangeRequiredMemory((ExchangeOperator) operator); } else { IPhysicalOperator physOp = ((AbstractLogicalOperator) operator).getPhysicalOperator(); - return getOperatorRequiredMemory(operator.getExecutionMode(), physOp.getLocalMemoryRequirements()); + return getOperatorRequiredMemory(operator.getExecutionMode(), physOp.getLocalMemoryRequirements()) + + getOperatorExpressionsRequiredMemory(operator); } } + private long getOperatorExpressionsRequiredMemory(ILogicalOperator operator) { + exprMemoryComputer.reset(operator); + try { + operator.acceptExpressionTransform(exprMemoryComputer); + } catch (Throwable e) { + // ignore + LOGGER.warn("encountered error while computing operator expressions required memory", e); + } + return exprMemoryComputer.requiredMemory; + } + private long getOperatorRequiredMemory(AbstractLogicalOperator.ExecutionMode opExecMode, long memorySize) { if (opExecMode == AbstractLogicalOperator.ExecutionMode.PARTITIONED || opExecMode == AbstractLogicalOperator.ExecutionMode.LOCAL) { @@ -78,4 +107,42 @@ public class OperatorResourcesComputer { } return 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * numComputationPartitions * frameSize; } + + class ExpressionMemoryComputer implements ILogicalExpressionReferenceTransform { + + private long requiredMemory; + private ILogicalOperator operator; + + public ExpressionMemoryComputer() { + } + + @Override + public boolean transform(Mutable<ILogicalExpression> expression) throws AlgebricksException { + ILogicalExpression expr = expression.getValue(); + if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) { + AbstractFunctionCallExpression funExpr = (AbstractFunctionCallExpression) expr; + if (funExpr.getKind() == AbstractFunctionCallExpression.FunctionKind.AGGREGATE) { + if (isMedian(funExpr.getFunctionIdentifier())) { + requiredMemory += + (compilerProperties.getSortMemorySize() * numCompute(operator.getExecutionMode())); + } + } + } + return false; + } + + private int numCompute(AbstractLogicalOperator.ExecutionMode executionMode) { + return (executionMode == AbstractLogicalOperator.ExecutionMode.PARTITIONED + || executionMode == AbstractLogicalOperator.ExecutionMode.LOCAL) ? numComputationPartitions : 1; + } + + private boolean isMedian(FunctionIdentifier funId) { + return BuiltinFunctions.LOCAL_SQL_MEDIAN.equals(funId) || BuiltinFunctions.SQL_MEDIAN.equals(funId); + } + + public void reset(ILogicalOperator op) { + requiredMemory = 0; + operator = op; + } + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java index ba11956b10..353bebfc69 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.asterix.app.resource.OperatorResourcesComputer; import org.apache.asterix.app.resource.PlanStage; import org.apache.asterix.app.resource.PlanStagesGenerator; +import org.apache.asterix.common.config.CompilerProperties; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; @@ -38,25 +39,28 @@ public class ResourceUtils { } /** - * Calculates the required cluster capacity from a given query plan, the computation locations, - * the operator memory budgets, and frame size. + * Calculates the required cluster capacity from a given query plan, the computation locations, the operator memory + * budgets, and frame size. * * @param plan, - * a given query plan. + * a given query plan. * @param computationLocations, - * the partitions for computation. + * the partitions for computation. * @param physicalOptimizationConfig, - * a PhysicalOptimizationConfig. + * a PhysicalOptimizationConfig. + * @param compilerProperties * @return the required cluster capacity for executing the query. * @throws AlgebricksException - * if the query plan is malformed. + * if the query plan is malformed. */ public static IClusterCapacity getRequiredCapacity(ILogicalPlan plan, AlgebricksAbsolutePartitionConstraint computationLocations, - PhysicalOptimizationConfig physicalOptimizationConfig) throws AlgebricksException { + PhysicalOptimizationConfig physicalOptimizationConfig, CompilerProperties compilerProperties) + throws AlgebricksException { final int frameSize = physicalOptimizationConfig.getFrameSize(); final List<PlanStage> planStages = getStages(plan); - return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, frameSize); + return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, frameSize, + compilerProperties); } public static List<PlanStage> getStages(ILogicalPlan plan) throws AlgebricksException { @@ -68,8 +72,9 @@ public class ResourceUtils { } public static IClusterCapacity getStageBasedRequiredCapacity(List<PlanStage> stages, int computationLocations, - int frameSize) { - final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, frameSize); + int frameSize, CompilerProperties compilerProperties) { + final OperatorResourcesComputer computer = + new OperatorResourcesComputer(computationLocations, frameSize, compilerProperties); final IClusterCapacity clusterCapacity = new ClusterCapacity(); final long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max() .orElseThrow(IllegalStateException::new); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java index b0de85ed5f..77d0e6058f 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java @@ -340,7 +340,7 @@ public class PlanStagesGeneratorTest { } } final IClusterCapacity clusterCapacity = - ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM, FRAME_SIZE); + ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM, FRAME_SIZE, null); Assert.assertEquals(clusterCapacity.getAggregatedMemoryByteSize(), expectedMemory); } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java index 218ef9768e..f8dbaad8a4 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java @@ -60,7 +60,7 @@ public class PartitionManagerTest { public void failedJobPartitionRequestTest() throws Exception { final NodeControllerService nc1 = integrationUtil.ncs[0]; final NodeControllerService nc2 = integrationUtil.ncs[1]; - final JobId failedJob = new JobId(-1); + final JobId failedJob = new JobId(10); nc2.getPartitionManager().jobCompleted(failedJob, JobStatus.FAILURE); final NetworkAddress localNetworkAddress = nc2.getNetworkManager().getPublicNetworkAddress(); final InetSocketAddress nc2Address = diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java index 325c238504..5ed4621ccf 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java @@ -286,4 +286,9 @@ public class CompilerProperties extends AbstractProperties { return AlgebricksConfig.QUERY_PLAN_SHAPE_DEFAULT; return queryPlanShapeMode; } + + public int getSortMemoryFrames() { + int numFrames = (int) getSortMemorySize() / getFrameSize(); + return Math.max(numFrames, OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_SORT); + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java index ae704758d7..93074696b6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java @@ -34,7 +34,7 @@ import org.apache.hyracks.control.common.config.OptionTypes; public class OptimizationConfUtil { - private static final int MIN_FRAME_LIMIT_FOR_SORT = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT; + public static final int MIN_FRAME_LIMIT_FOR_SORT = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT; private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY; private static final int MIN_FRAME_LIMIT_FOR_JOIN = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN; private static final int MIN_FRAME_LIMIT_FOR_WINDOW = WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW; diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java index 4db68c347c..bb05ba6d49 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java @@ -95,6 +95,7 @@ import org.apache.asterix.om.typecomputer.impl.IfNullTypeComputer; import org.apache.asterix.om.typecomputer.impl.InjectFailureTypeComputer; import org.apache.asterix.om.typecomputer.impl.Int64ArrayToStringTypeComputer; import org.apache.asterix.om.typecomputer.impl.LocalAvgTypeComputer; +import org.apache.asterix.om.typecomputer.impl.LocalMedianTypeComputer; import org.apache.asterix.om.typecomputer.impl.LocalSingleVarStatisticsTypeComputer; import org.apache.asterix.om.typecomputer.impl.MinMaxAggTypeComputer; import org.apache.asterix.om.typecomputer.impl.MissingIfTypeComputer; @@ -551,6 +552,8 @@ public class BuiltinFunctions { new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-intermediate-avg", 1); public static final FunctionIdentifier LOCAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-avg", 1); + public static final FunctionIdentifier MEDIAN = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-median", 1); public static final FunctionIdentifier FIRST_ELEMENT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-first-element", 1); public static final FunctionIdentifier LOCAL_FIRST_ELEMENT = @@ -628,6 +631,8 @@ public class BuiltinFunctions { public static final FunctionIdentifier SCALAR_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sum", 1); public static final FunctionIdentifier SCALAR_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "max", 1); public static final FunctionIdentifier SCALAR_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "min", 1); + public static final FunctionIdentifier SCALAR_MEDIAN = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "median", 1); public static final FunctionIdentifier SCALAR_FIRST_ELEMENT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "first-element", 1); public static final FunctionIdentifier SCALAR_LOCAL_FIRST_ELEMENT = @@ -802,6 +807,14 @@ public class BuiltinFunctions { new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-global-sql-avg", 1); public static final FunctionIdentifier LOCAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sql-avg", 1); + public static final FunctionIdentifier SQL_MEDIAN = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-sql-median", 1); + public static final FunctionIdentifier LOCAL_SQL_MEDIAN = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sql-median", 1); + public static final FunctionIdentifier INTERMEDIATE_SQL_MEDIAN = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-intermediate-sql-median", 1); + public static final FunctionIdentifier GLOBAL_SQL_MEDIAN = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-global-sql-median", 1); public static final FunctionIdentifier SQL_STDDEV_SAMP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-sql-stddev_samp", 1); public static final FunctionIdentifier INTERMEDIATE_SQL_STDDEV_SAMP = @@ -869,6 +882,8 @@ public class BuiltinFunctions { new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-max", 1); public static final FunctionIdentifier SCALAR_SQL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-min", 1); + public static final FunctionIdentifier SCALAR_SQL_MEDIAN = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-median", 1); public static final FunctionIdentifier SCALAR_SQL_STDDEV_SAMP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-stddev_samp", 1); public static final FunctionIdentifier SCALAR_SQL_STDDEV_POP = @@ -2143,6 +2158,13 @@ public class BuiltinFunctions { addPrivateFunction(GLOBAL_SQL_UNION_MBR, ARectangleTypeComputer.INSTANCE, true); addFunction(SCALAR_SQL_UNION_MBR, ARectangleTypeComputer.INSTANCE, true); + addFunction(SQL_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true); + addFunction(SCALAR_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true); + addFunction(SCALAR_SQL_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true); + addPrivateFunction(LOCAL_SQL_MEDIAN, LocalMedianTypeComputer.INSTANCE, true); + addPrivateFunction(INTERMEDIATE_SQL_MEDIAN, LocalMedianTypeComputer.INSTANCE, true); + addPrivateFunction(GLOBAL_SQL_MEDIAN, NullableDoubleTypeComputer.INSTANCE, true); + addPrivateFunction(SERIAL_AVG, NullableDoubleTypeComputer.INSTANCE, true); addPrivateFunction(SERIAL_COUNT, AInt64TypeComputer.INSTANCE, true); addPrivateFunction(SERIAL_GLOBAL_AVG, NullableDoubleTypeComputer.INSTANCE, true); @@ -3169,6 +3191,25 @@ public class BuiltinFunctions { addDistinctAgg(SQL_SUM_DISTINCT, SQL_SUM); addScalarAgg(SQL_SUM_DISTINCT, SCALAR_SQL_SUM_DISTINCT); + // SQL MEDIAN + addAgg(SQL_MEDIAN); + addAgg(LOCAL_SQL_MEDIAN); + addAgg(GLOBAL_SQL_MEDIAN); + + addLocalAgg(SQL_MEDIAN, LOCAL_SQL_MEDIAN); + + addIntermediateAgg(SQL_MEDIAN, INTERMEDIATE_SQL_MEDIAN); + addIntermediateAgg(LOCAL_SQL_MEDIAN, INTERMEDIATE_SQL_MEDIAN); + addIntermediateAgg(GLOBAL_SQL_MEDIAN, INTERMEDIATE_SQL_MEDIAN); + + addGlobalAgg(SQL_MEDIAN, GLOBAL_SQL_MEDIAN); + + addScalarAgg(MEDIAN, SCALAR_MEDIAN); + addScalarAgg(SQL_MEDIAN, SCALAR_SQL_MEDIAN); + + registerAggFunctionProperties(SCALAR_SQL_MEDIAN, AggregateFunctionProperty.NO_FRAME_CLAUSE, + AggregateFunctionProperty.NO_ORDER_CLAUSE); + // SPATIAL AGGREGATES addAgg(ST_UNION_AGG); @@ -3203,6 +3244,13 @@ public class BuiltinFunctions { interface BuiltinFunctionProperty { } + public enum AggregateFunctionProperty implements BuiltinFunctionProperty { + /** Whether the order clause is prohibited */ + NO_ORDER_CLAUSE, + /** Whether the frame clause is prohibited */ + NO_FRAME_CLAUSE + } + public enum WindowFunctionProperty implements BuiltinFunctionProperty { /** Whether the order clause is prohibited */ NO_ORDER_CLAUSE, @@ -3365,6 +3413,11 @@ public class BuiltinFunctions { registeredFunctions.put(functionInfo.getFunctionIdentifier(), functionInfo); } + private static <T extends Enum<T> & BuiltinFunctionProperty> void registerAggFunctionProperties( + FunctionIdentifier fid, AggregateFunctionProperty... properties) { + registerFunctionProperties(fid, AggregateFunctionProperty.class, properties); + } + private static <T extends Enum<T> & BuiltinFunctionProperty> void registerFunctionProperties(FunctionIdentifier fid, Class<T> propertyClass, T[] properties) { if (properties == null) { diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/LocalMedianTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/LocalMedianTypeComputer.java new file mode 100644 index 0000000000..3411ef4d32 --- /dev/null +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/LocalMedianTypeComputer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.om.typecomputer.impl; + +import org.apache.asterix.om.typecomputer.base.IResultTypeComputer; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.IAType; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; + +public class LocalMedianTypeComputer implements IResultTypeComputer { + + public static final LocalMedianTypeComputer INSTANCE = new LocalMedianTypeComputer(); + + public static final ARecordType REC_TYPE = new ARecordType(null, + new String[] { "count", "handle", "address", "port" }, + new IAType[] { BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.ASTRING, BuiltinType.AINT32 }, false); + + @Override + public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env, + IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException { + return REC_TYPE; + } +} diff --git a/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java b/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java index a85798199c..07f2021984 100644 --- a/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java +++ b/asterixdb/asterix-om/src/test/java/org/apache/asterix/test/om/typecomputer/TypeComputerTest.java @@ -36,6 +36,7 @@ import org.apache.asterix.om.typecomputer.impl.BooleanOrMissingTypeComputer; import org.apache.asterix.om.typecomputer.impl.ClosedRecordConstructorResultType; import org.apache.asterix.om.typecomputer.impl.InjectFailureTypeComputer; import org.apache.asterix.om.typecomputer.impl.LocalAvgTypeComputer; +import org.apache.asterix.om.typecomputer.impl.LocalMedianTypeComputer; import org.apache.asterix.om.typecomputer.impl.LocalSingleVarStatisticsTypeComputer; import org.apache.asterix.om.typecomputer.impl.NullableDoubleTypeComputer; import org.apache.asterix.om.typecomputer.impl.OpenRecordConstructorResultType; @@ -190,6 +191,7 @@ public class TypeComputerTest { differentBehaviorFunctions.add(RecordRemoveFieldsTypeComputer.class.getSimpleName()); differentBehaviorFunctions.add(ClosedRecordConstructorResultType.class.getSimpleName()); differentBehaviorFunctions.add(LocalAvgTypeComputer.class.getSimpleName()); + differentBehaviorFunctions.add(LocalMedianTypeComputer.class.getSimpleName()); differentBehaviorFunctions.add(BooleanOnlyTypeComputer.class.getSimpleName()); // differentBehaviorFunctions.add("AMissingTypeComputer"); // TODO What type computer is this? differentBehaviorFunctions.add(NullableDoubleTypeComputer.class.getSimpleName()); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/scalar/ScalarSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/scalar/ScalarSqlMedianAggregateDescriptor.java new file mode 100644 index 0000000000..5c83a5e9e5 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/scalar/ScalarSqlMedianAggregateDescriptor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.scalar; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.om.functions.IFunctionTypeInferer; +import org.apache.asterix.runtime.aggregates.std.SqlMedianAggregateDescriptor; +import org.apache.asterix.runtime.functions.FunctionTypeInferers; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; + +public class ScalarSqlMedianAggregateDescriptor extends AbstractScalarAggregateDescriptor { + + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new ScalarSqlMedianAggregateDescriptor(); + } + + @Override + public IFunctionTypeInferer createFunctionTypeInferer() { + return FunctionTypeInferers.MEDIAN_MEMORY; + } + }; + + private ScalarSqlMedianAggregateDescriptor() { + super(SqlMedianAggregateDescriptor.FACTORY); + } + + @Override + public void setImmutableStates(Object... states) { + super.setImmutableStates(states); + aggFuncDesc.setImmutableStates(states); + } + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.SCALAR_SQL_MEDIAN; + } +} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractLocalMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractLocalMedianAggregateFunction.java new file mode 100644 index 0000000000..8ff7ed5c9b --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractLocalMedianAggregateFunction.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import java.util.List; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.EnumDeserializer; +import org.apache.asterix.om.types.hierachy.ATypeHierarchy; +import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.comm.IFrameTupleAppender; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.context.IHyracksJobletContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.control.nc.partitions.JobFileState; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; +import org.apache.hyracks.dataflow.common.io.RunFileWriter; +import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy; +import org.apache.hyracks.dataflow.std.sort.Algorithm; +import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator; +import org.apache.hyracks.dataflow.std.sort.ExternalSortRunMerger; +import org.apache.hyracks.dataflow.std.sort.ISorter; + +public abstract class AbstractLocalMedianAggregateFunction extends AbstractMedianAggregateFunction { + + protected final ExternalSortRunGenerator runsGenerator; + protected final IFrameTupleAppender appender; + private final ArrayTupleBuilder tupleBuilder; + private final int numFrames; + private ExternalSortRunMerger runsMerger; + + public AbstractLocalMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context, + SourceLocation sourceLoc, int numFrames) throws HyracksDataException { + super(args, context, sourceLoc); + this.numFrames = numFrames; + appender = new FrameTupleAppender(frame); + tupleBuilder = new ArrayTupleBuilder(1); + runsGenerator = new ExternalSortRunGenerator(context.getTaskContext(), new int[] { 0 }, + new INormalizedKeyComputerFactory[] { doubleNkComputerFactory }, + new IBinaryComparatorFactory[] { doubleComparatorFactory }, recordDesc, Algorithm.MERGE_SORT, + EnumFreeSlotPolicy.LAST_FIT, numFrames, Integer.MAX_VALUE); + + } + + @Override + public void init() throws HyracksDataException { + super.init(); + appender.reset(frame, true); + runsGenerator.open(); + runsGenerator.getSorter().reset(); + } + + protected void processDataValue(IFrameTupleReference tuple) throws HyracksDataException { + eval.evaluate(tuple, inputVal); + byte[] data = inputVal.getByteArray(); + int start = inputVal.getStartOffset(); + ATypeTag tag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[start]); + if (ATypeHierarchy.getTypeDomain(tag) == ATypeHierarchy.Domain.NUMERIC) { + count++; + aDouble.setValue(ATypeHierarchy.getDoubleValue(MEDIAN, 0, data, start)); + tupleBuilder.reset(); + tupleBuilder.addField(doubleSerde, aDouble); + FrameUtils.appendToWriter(runsGenerator, appender, tupleBuilder.getFieldEndOffsets(), + tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()); + } + } + + protected void finishLocalPartial(IPointable result) throws HyracksDataException { + if (count == 0) { + setPartialResult(result, -1, "", -1); + return; + } + if (appender.getTupleCount() > 0) { + appender.write(runsGenerator, true); + } + // close to sort the in-memory data or write out sorted data to run files + runsGenerator.close(); + + IHyracksTaskContext taskCtx = ctx.getTaskContext(); + IHyracksJobletContext jobletCtx = taskCtx.getJobletContext(); + NetworkAddress netAddress = ((NodeControllerService) jobletCtx.getServiceContext().getControllerService()) + .getNetworkManager().getPublicNetworkAddress(); + FileReference fileRef = writeRunFile(taskCtx, jobletCtx); + long fileId = jobletCtx.nextUniqueId(); + taskCtx.setStateObject(new JobFileState(fileRef, jobletCtx.getJobId(), fileId)); + setPartialResult(result, fileId, netAddress.getAddress(), netAddress.getPort()); + } + + private FileReference writeRunFile(IHyracksTaskContext taskCtx, IHyracksJobletContext jobletCtx) + throws HyracksDataException { + List<GeneratedRunFileReader> runs = runsGenerator.getRuns(); + FileReference managedFile; + if (runs.isEmpty()) { + managedFile = jobletCtx.createManagedWorkspaceFile(MEDIAN); + writeMemoryDataToRunFile(managedFile, taskCtx); + } else if (runs.size() == 1) { + managedFile = runs.get(0).getFile(); + } else { + managedFile = jobletCtx.createManagedWorkspaceFile(MEDIAN); + mergeRunsToRunFile(managedFile, taskCtx, runs); + } + return managedFile; + } + + private void mergeRunsToRunFile(FileReference managedFile, IHyracksTaskContext taskCtx, + List<GeneratedRunFileReader> runs) throws HyracksDataException { + createOrResetRunsMerger(runs); + RunFileWriter runFileWriter = new RunFileWriter(managedFile, taskCtx.getIoManager()); + IFrameWriter wrappingWriter = runsMerger.prepareFinalMergeResultWriter(runFileWriter); + try { + wrappingWriter.open(); + runsMerger.process(wrappingWriter); + } finally { + wrappingWriter.close(); + } + } + + protected RunFileWriter writeMemoryDataToRunFile(FileReference managedFile, IHyracksTaskContext taskCtx) + throws HyracksDataException { + RunFileWriter runFileWriter = new RunFileWriter(managedFile, taskCtx.getIoManager()); + try { + runFileWriter.open(); + ISorter sorter = runsGenerator.getSorter(); + if (sorter.hasRemaining()) { + sorter.flush(runFileWriter); + } + } finally { + runFileWriter.close(); + } + return runFileWriter; + } + + private void createOrResetRunsMerger(List<GeneratedRunFileReader> runs) { + if (runsMerger == null) { + IBinaryComparator[] comparators = + new IBinaryComparator[] { doubleComparatorFactory.createBinaryComparator() }; + INormalizedKeyComputer nmkComputer = doubleNkComputerFactory.createNormalizedKeyComputer(); + runsMerger = new ExternalSortRunMerger(ctx.getTaskContext(), runs, new int[] { 0 }, comparators, + nmkComputer, recordDesc, numFrames, Integer.MAX_VALUE); + } else { + runsMerger.reset(runs); + } + } +} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java new file mode 100644 index 0000000000..a695c231d8 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.builders.RecordBuilder; +import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer; +import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer; +import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer; +import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer; +import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; +import org.apache.asterix.formats.nontagged.NormalizedKeyComputerFactoryProvider; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.formats.nontagged.TypeTraitProvider; +import org.apache.asterix.om.base.ADouble; +import org.apache.asterix.om.base.AInt32; +import org.apache.asterix.om.base.AInt64; +import org.apache.asterix.om.base.AMutableDouble; +import org.apache.asterix.om.base.AMutableInt32; +import org.apache.asterix.om.base.AMutableInt64; +import org.apache.asterix.om.base.AMutableString; +import org.apache.asterix.om.base.AString; +import org.apache.asterix.om.typecomputer.impl.LocalMedianTypeComputer; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.EnumDeserializer; +import org.apache.asterix.runtime.evaluators.functions.PointableHelper; +import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException; +import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameReader; +import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksJobletContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.comm.channels.FileNetworkInputChannel; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.control.nc.net.NetworkManager; +import org.apache.hyracks.control.nc.partitions.JobFileState; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +import org.apache.hyracks.dataflow.common.io.RunFileWriter; +import org.apache.hyracks.dataflow.std.collectors.InputChannelFrameReader; +import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader; +import org.apache.hyracks.util.string.UTF8StringUtil; + +public abstract class AbstractMedianAggregateFunction extends AbstractAggregateFunction { + + protected static final String MEDIAN = "median"; + private static final int COUNT_FIELD_ID = 0; + private static final int HANDLE_FIELD_ID = 1; + private static final int ADDRESS_FIELD_ID = 2; + private static final int PORT_FIELD_ID = 3; + + private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage(); + @SuppressWarnings("unchecked") + private final ISerializerDeserializer<AString> stringSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING); + @SuppressWarnings("unchecked") + private final ISerializerDeserializer<AInt64> longSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); + @SuppressWarnings("unchecked") + private final ISerializerDeserializer<AInt32> intSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); + @SuppressWarnings("unchecked") + protected final ISerializerDeserializer<ADouble> doubleSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE); + protected final IBinaryComparatorFactory doubleComparatorFactory = + BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(ATypeTag.DOUBLE, true); + protected final INormalizedKeyComputerFactory doubleNkComputerFactory = + NormalizedKeyComputerFactoryProvider.INSTANCE.getNormalizedKeyComputerFactory(BuiltinType.ADOUBLE, true); + protected final RecordDescriptor recordDesc = new RecordDescriptor(new ISerializerDeserializer[] { doubleSerde }, + new ITypeTraits[] { TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ADOUBLE) }); + + protected final AMutableString aString = new AMutableString(""); + protected final AMutableInt64 aInt64 = new AMutableInt64(0); + protected final AMutableInt32 aInt32 = new AMutableInt32(0); + protected final AMutableDouble aDouble = new AMutableDouble(0); + protected final IPointable inputVal = new VoidPointable(); + private final FrameTupleReference ftr = new FrameTupleReference(); + private final FrameTupleAccessor fta = new FrameTupleAccessor(recordDesc); + protected final List<IFrameReader> readers = new ArrayList<>(); + + protected final IScalarEvaluator eval; + protected final IEvaluatorContext ctx; + protected final IFrame frame; + protected long count; + private List<IFrame> inFrames; + private List<PartialResult> partialResults; + private RecordBuilder recBuilder; + + public AbstractMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(sourceLoc); + ctx = context; + eval = args[0].createScalarEvaluator(context); + frame = new VSizeFrame(context.getTaskContext()); + } + + @Override + public void init() throws HyracksDataException { + if (partialResults == null) { + partialResults = new ArrayList<>(); + } + if (recBuilder == null) { + recBuilder = new RecordBuilder(); + recBuilder.reset(LocalMedianTypeComputer.REC_TYPE); + } + count = 0; + partialResults.clear(); + recBuilder.init(); + } + + protected void processPartialResults(IFrameTupleReference tuple) throws HyracksDataException { + eval.evaluate(tuple, inputVal); + byte[] serBytes = inputVal.getByteArray(); + int offset = inputVal.getStartOffset(); + ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[offset]); + if (typeTag == ATypeTag.OBJECT) { + long handleCount = AInt64SerializerDeserializer.getLong(serBytes, + ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, COUNT_FIELD_ID, 0, false)); + if (handleCount == 0) { + return; + } + count += handleCount; + + long fileId = AInt64SerializerDeserializer.getLong(serBytes, + ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, HANDLE_FIELD_ID, 0, false)); + + String address = UTF8StringUtil.toString(serBytes, + ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, ADDRESS_FIELD_ID, 0, false)); + + int port = AInt32SerializerDeserializer.getInt(serBytes, + ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, PORT_FIELD_ID, 0, false)); + + partialResults.add(new PartialResult(fileId, handleCount, address, port)); + } else { + throw new UnsupportedItemTypeException(sourceLoc, MEDIAN, serBytes[offset]); + } + } + + protected void finishPartialResult(IPointable result) throws HyracksDataException { + if (count == 0) { + setPartialResult(result, -1, "", -1); + return; + } + + IHyracksTaskContext taskCtx = ctx.getTaskContext(); + IHyracksJobletContext jobletCtx = taskCtx.getJobletContext(); + RunMergingFrameReader merger = createRunsMergingFrameReader(); + FileReference managedFile = jobletCtx.createManagedWorkspaceFile(MEDIAN); + RunFileWriter runFileWriter = new RunFileWriter(managedFile, taskCtx.getIoManager()); + merger.open(); + runFileWriter.open(); + try { + while (merger.nextFrame(frame)) { + runFileWriter.nextFrame(frame.getBuffer()); + } + } finally { + runFileWriter.close(); + merger.close(); + } + + NetworkAddress netAddress = ((NodeControllerService) jobletCtx.getServiceContext().getControllerService()) + .getNetworkManager().getPublicNetworkAddress(); + + long fileId = jobletCtx.nextUniqueId(); + taskCtx.setStateObject(new JobFileState(managedFile, jobletCtx.getJobId(), fileId)); + setPartialResult(result, fileId, netAddress.getAddress(), netAddress.getPort()); + } + + protected void finishFinalResult(IPointable result) throws HyracksDataException { + if (count == 0) { + PointableHelper.setNull(result); + return; + } + try { + double medianVal = findMedian(); + resultStorage.reset(); + aDouble.setValue(medianVal); + doubleSerde.serialize(aDouble, resultStorage.getDataOutput()); + result.set(resultStorage); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + private double findMedian() throws HyracksDataException { + RunMergingFrameReader merger = createRunsMergingFrameReader(); + return getMedian(merger); + } + + protected RunMergingFrameReader createRunsMergingFrameReader() throws HyracksDataException { + IHyracksTaskContext taskCtx = ctx.getTaskContext(); + IHyracksJobletContext jobletCtx = taskCtx.getJobletContext(); + INCServiceContext serviceCtx = jobletCtx.getServiceContext(); + NetworkManager netManager = ((NodeControllerService) serviceCtx.getControllerService()).getNetworkManager(); + List<IFrame> inFrames = getInFrames(partialResults.size(), taskCtx); + readers.clear(); + for (PartialResult partialResult : partialResults) { + IFrameReader inputChannelReader = createInputChannel(netManager, taskCtx, + new NetworkAddress(partialResult.address, partialResult.port), jobletCtx.getJobId().getId(), + partialResult.fileId); + readers.add(inputChannelReader); + } + return new RunMergingFrameReader(taskCtx, readers, inFrames, new int[] { 0 }, + new IBinaryComparator[] { doubleComparatorFactory.createBinaryComparator() }, + doubleNkComputerFactory.createNormalizedKeyComputer(), recordDesc); + } + + private double getMedian(RunMergingFrameReader merger) throws HyracksDataException { + boolean isOdd = count % 2 != 0; + long medianPosition = isOdd ? count / 2 : (count - 1) / 2; + long currentTupleCount = 0; + double medianVal = -1; + merger.open(); + try { + while (merger.nextFrame(frame)) { + fta.reset(frame.getBuffer()); + int tupleCount = fta.getTupleCount(); + if (currentTupleCount + tupleCount > medianPosition) { + int firstMedian = (int) (medianPosition - currentTupleCount); + ftr.reset(fta, firstMedian); + medianVal = ADoubleSerializerDeserializer.getDouble(ftr.getFieldData(0), ftr.getFieldStart(0) + 1); + if (!isOdd) { + if (firstMedian + 1 < tupleCount) { + // second median is in the same frame + ftr.reset(fta, firstMedian + 1); + } else { + // second median is in the next frame + merger.nextFrame(frame); + fta.reset(frame.getBuffer()); + ftr.reset(fta, 0); + } + medianVal = + (ADoubleSerializerDeserializer.getDouble(ftr.getFieldData(0), ftr.getFieldStart(0) + 1) + + medianVal) / 2; + } + break; + } + currentTupleCount += tupleCount; + } + } finally { + merger.close(); + } + return medianVal; + } + + protected void setPartialResult(IPointable result, long fileId, String address, int port) + throws HyracksDataException { + try { + resultStorage.reset(); + aInt64.setValue(count); + longSerde.serialize(aInt64, resultStorage.getDataOutput()); + recBuilder.addField(COUNT_FIELD_ID, resultStorage); + + resultStorage.reset(); + aInt64.setValue(fileId); + longSerde.serialize(aInt64, resultStorage.getDataOutput()); + recBuilder.addField(HANDLE_FIELD_ID, resultStorage); + + resultStorage.reset(); + aString.setValue(address); + stringSerde.serialize(aString, resultStorage.getDataOutput()); + recBuilder.addField(ADDRESS_FIELD_ID, resultStorage); + + resultStorage.reset(); + aInt32.setValue(port); + intSerde.serialize(aInt32, resultStorage.getDataOutput()); + recBuilder.addField(PORT_FIELD_ID, resultStorage); + + resultStorage.reset(); + recBuilder.write(resultStorage.getDataOutput(), true); + result.set(resultStorage); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + protected List<IFrame> getInFrames(int size, IHyracksTaskContext taskCtx) throws HyracksDataException { + if (inFrames == null) { + inFrames = new ArrayList<>(size); + } + int k = 0; + for (int inFramesSize = inFrames.size(); k < size && k < inFramesSize; k++) { + inFrames.get(k).reset(); + } + for (; k < size; k++) { + inFrames.add(new VSizeFrame(taskCtx)); + } + return inFrames; + } + + private IFrameReader createInputChannel(NetworkManager netManager, IHyracksTaskContext taskContext, + NetworkAddress networkAddress, long jobId, long fileId) throws HyracksDataException { + FileNetworkInputChannel FileNetworkInputChannel = + new FileNetworkInputChannel(netManager, getSocketAddress(networkAddress), jobId, fileId); + InputChannelFrameReader channelFrameReader = new InputChannelFrameReader(FileNetworkInputChannel); + FileNetworkInputChannel.registerMonitor(channelFrameReader); + FileNetworkInputChannel.open(taskContext); + return channelFrameReader; + } + + private static SocketAddress getSocketAddress(NetworkAddress netAddress) throws HyracksDataException { + try { + return new InetSocketAddress(InetAddress.getByAddress(netAddress.lookupIpAddress()), netAddress.getPort()); + } catch (UnknownHostException e) { + throw HyracksDataException.create(e); + } + } + + public static class PartialResult { + + long fileId; + String address; + int port; + long count; + + PartialResult(long fileId, long count, String address, int port) { + this.fileId = fileId; + this.count = count; + this.address = address; + this.port = port; + } + } +} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateDescriptor.java new file mode 100644 index 0000000000..faf5272889 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateDescriptor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class GlobalSqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = GlobalSqlMedianAggregateDescriptor::new; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.GLOBAL_SQL_MEDIAN; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx) + throws HyracksDataException { + return new GlobalSqlMedianAggregateFunction(args, ctx, sourceLoc); + } + }; + } +} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateFunction.java new file mode 100644 index 0000000000..c9ee8d8325 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlMedianAggregateFunction.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class GlobalSqlMedianAggregateFunction extends AbstractMedianAggregateFunction { + + public GlobalSqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + + } + + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + processPartialResults(tuple); + } + + @Override + public void finish(IPointable result) throws HyracksDataException { + finishFinalResult(result); + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finishPartialResult(result); + } +} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateDescriptor.java new file mode 100644 index 0000000000..532cb5f6ea --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateDescriptor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class IntermediateSqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = IntermediateSqlMedianAggregateDescriptor::new; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.INTERMEDIATE_SQL_MEDIAN; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx) + throws HyracksDataException { + return new IntermediateSqlMedianAggregateFunction(args, ctx, sourceLoc); + } + }; + } +} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateFunction.java new file mode 100644 index 0000000000..d78c17fd6b --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlMedianAggregateFunction.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class IntermediateSqlMedianAggregateFunction extends AbstractMedianAggregateFunction { + + public IntermediateSqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + processPartialResults(tuple); + } + + @Override + public void finish(IPointable result) throws HyracksDataException { + finishPartialResult(result); + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finishPartialResult(result); + } +} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateDescriptor.java new file mode 100644 index 0000000000..50f36ceee5 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateDescriptor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.om.functions.IFunctionTypeInferer; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.asterix.runtime.functions.FunctionTypeInferers; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class LocalSqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new LocalSqlMedianAggregateDescriptor(); + } + + @Override + public IFunctionTypeInferer createFunctionTypeInferer() { + return FunctionTypeInferers.MEDIAN_MEMORY; + } + }; + + private int numFrames = 0; + + @Override + public void setImmutableStates(Object... states) { + this.numFrames = (int) states[0]; + } + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.LOCAL_SQL_MEDIAN; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx) + throws HyracksDataException { + return new LocalSqlMedianAggregateFunction(args, ctx, sourceLoc, numFrames); + } + }; + } +} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateFunction.java new file mode 100644 index 0000000000..5400ca2df8 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlMedianAggregateFunction.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class LocalSqlMedianAggregateFunction extends AbstractLocalMedianAggregateFunction { + + public LocalSqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context, + SourceLocation sourceLoc, int numFrames) throws HyracksDataException { + super(args, context, sourceLoc, numFrames); + } + + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + processDataValue(tuple); + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finishLocalPartial(result); + } + + @Override + public void finish(IPointable result) throws HyracksDataException { + finishLocalPartial(result); + } +} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateDescriptor.java new file mode 100644 index 0000000000..b98e389b19 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateDescriptor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.om.functions.IFunctionTypeInferer; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.asterix.runtime.functions.FunctionTypeInferers; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class SqlMedianAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new SqlMedianAggregateDescriptor(); + } + + @Override + public IFunctionTypeInferer createFunctionTypeInferer() { + return FunctionTypeInferers.MEDIAN_MEMORY; + } + }; + + private int numFrames = 0; + + @Override + public void setImmutableStates(Object... states) { + this.numFrames = (int) states[0]; + } + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.SQL_MEDIAN; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IEvaluatorContext ctx) + throws HyracksDataException { + return new SqlMedianAggregateFunction(args, ctx, sourceLoc, numFrames); + } + }; + } + +} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateFunction.java new file mode 100644 index 0000000000..1dbbfe6fe0 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlMedianAggregateFunction.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import java.util.List; + +import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; +import org.apache.hyracks.dataflow.common.io.RunFileWriter; +import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader; + +public class SqlMedianAggregateFunction extends AbstractLocalMedianAggregateFunction { + + public SqlMedianAggregateFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context, + SourceLocation sourceLoc, int numFrames) throws HyracksDataException { + super(args, context, sourceLoc, numFrames); + } + + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + processDataValue(tuple); + } + + @Override + public void finish(IPointable result) throws HyracksDataException { + if (appender.getTupleCount() > 0) { + appender.write(runsGenerator, true); + } + // close to sort the in-memory data or write out sorted data to run files + runsGenerator.close(); + super.finishFinalResult(result); + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finishLocalPartial(result); + } + + @Override + protected RunMergingFrameReader createRunsMergingFrameReader() throws HyracksDataException { + IHyracksTaskContext taskCtx = ctx.getTaskContext(); + List<GeneratedRunFileReader> runs = runsGenerator.getRuns(); + readers.clear(); + if (runs.isEmpty()) { + //TODO: no need to write memory to run file, should just read the sorted data out of the sorter + FileReference managedFile = taskCtx.createManagedWorkspaceFile(MEDIAN); + RunFileWriter runFileWriter = writeMemoryDataToRunFile(managedFile, taskCtx); + GeneratedRunFileReader deleteOnCloseReader = runFileWriter.createDeleteOnCloseReader(); + readers.add(deleteOnCloseReader); + } else { + readers.addAll(runs); + } + + List<IFrame> inFrames = getInFrames(readers.size(), taskCtx); + return new RunMergingFrameReader(taskCtx, readers, inFrames, new int[] { 0 }, + new IBinaryComparator[] { doubleComparatorFactory.createBinaryComparator() }, + doubleNkComputerFactory.createNormalizedKeyComputer(), recordDesc); + } +} \ No newline at end of file diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java index 4b418b65c5..2b8d4e2eee 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java @@ -54,6 +54,7 @@ import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlKurtosisAggregateDe import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlKurtosisDistinctAggregateDescriptor; import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMaxAggregateDescriptor; import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMaxDistinctAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMedianAggregateDescriptor; import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMinAggregateDescriptor; import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlMinDistinctAggregateDescriptor; import org.apache.asterix.runtime.aggregates.scalar.ScalarSqlSkewnessAggregateDescriptor; @@ -156,6 +157,7 @@ import org.apache.asterix.runtime.aggregates.std.GlobalSkewnessAggregateDescript import org.apache.asterix.runtime.aggregates.std.GlobalSqlAvgAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.GlobalSqlKurtosisAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.GlobalSqlMaxAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.std.GlobalSqlMedianAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.GlobalSqlMinAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.GlobalSqlSkewnessAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.GlobalSqlStddevAggregateDescriptor; @@ -178,6 +180,7 @@ import org.apache.asterix.runtime.aggregates.std.IntermediateSkewnessAggregateDe import org.apache.asterix.runtime.aggregates.std.IntermediateSqlAvgAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.IntermediateSqlKurtosisAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.IntermediateSqlMaxAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.std.IntermediateSqlMedianAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.IntermediateSqlMinAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.IntermediateSqlSkewnessAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.IntermediateSqlStddevAggregateDescriptor; @@ -202,6 +205,7 @@ import org.apache.asterix.runtime.aggregates.std.LocalSkewnessAggregateDescripto import org.apache.asterix.runtime.aggregates.std.LocalSqlAvgAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.LocalSqlKurtosisAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.LocalSqlMaxAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.std.LocalSqlMedianAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.LocalSqlMinAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.LocalSqlSkewnessAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.LocalSqlStddevAggregateDescriptor; @@ -224,6 +228,7 @@ import org.apache.asterix.runtime.aggregates.std.SqlAvgAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.SqlCountAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.SqlKurtosisAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.SqlMaxAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.std.SqlMedianAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.SqlMinAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.SqlSkewnessAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.SqlStddevAggregateDescriptor; @@ -816,6 +821,10 @@ public final class FunctionCollection implements IFunctionCollection { fc.add(LocalSqlMinAggregateDescriptor.FACTORY); fc.add(IntermediateSqlMinAggregateDescriptor.FACTORY); fc.add(GlobalSqlMinAggregateDescriptor.FACTORY); + fc.add(SqlMedianAggregateDescriptor.FACTORY); + fc.add(LocalSqlMedianAggregateDescriptor.FACTORY); + fc.add(IntermediateSqlMedianAggregateDescriptor.FACTORY); + fc.add(GlobalSqlMedianAggregateDescriptor.FACTORY); fc.add(SqlStddevAggregateDescriptor.FACTORY); fc.add(LocalSqlStddevAggregateDescriptor.FACTORY); fc.add(IntermediateSqlStddevAggregateDescriptor.FACTORY); @@ -891,6 +900,7 @@ public final class FunctionCollection implements IFunctionCollection { fc.add(ScalarSqlMaxDistinctAggregateDescriptor.FACTORY); fc.add(ScalarSqlMinAggregateDescriptor.FACTORY); fc.add(ScalarSqlMinDistinctAggregateDescriptor.FACTORY); + fc.add(ScalarSqlMedianAggregateDescriptor.FACTORY); fc.add(ScalarSqlStddevAggregateDescriptor.FACTORY); fc.add(ScalarSqlStddevDistinctAggregateDescriptor.FACTORY); fc.add(ScalarSqlStddevPopAggregateDescriptor.FACTORY); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java index bc763bd886..7d484b6a89 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java @@ -116,6 +116,9 @@ public final class FunctionTypeInferers { } }; + public static final IFunctionTypeInferer MEDIAN_MEMORY = + (expr, fd, context, compilerProps) -> fd.setImmutableStates(compilerProps.getSortMemoryFrames()); + public static final class CastTypeInferer implements IFunctionTypeInferer { @Override public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context, diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java index 4d324f451e..422782e012 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java @@ -44,4 +44,6 @@ public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocat Class<?> loadClass(String className) throws HyracksException; ClassLoader getClassLoader() throws HyracksException; + + long nextUniqueId(); } diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java similarity index 70% copy from hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java copy to hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java index 53bb7cd9ad..542fda150c 100644 --- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java @@ -30,38 +30,30 @@ import org.apache.hyracks.api.comm.IChannelControlBlock; import org.apache.hyracks.api.comm.ICloseableBufferAcceptor; import org.apache.hyracks.api.context.IHyracksCommonContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.partitions.PartitionId; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -public class NetworkInputChannel implements IInputChannel { - private static final Logger LOGGER = LogManager.getLogger(); +public class FileNetworkInputChannel implements IInputChannel { - static final int INITIAL_MESSAGE_SIZE = 20; + private static final int NUM_READ_BUFFERS = 1; + public static final long FILE_CHANNEL_CODE = -1; private final IChannelConnectionFactory netManager; - private final SocketAddress remoteAddress; - - private final PartitionId partitionId; - + private final long jobId; + private final long fileId; private final Queue<ByteBuffer> fullQueue; - private final int nBuffers; - private IChannelControlBlock ccb; - private IInputChannelMonitor monitor; - private Object attachment; - public NetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, - PartitionId partitionId, int nBuffers) { + public FileNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, long jobId, + long fileId) { this.netManager = netManager; this.remoteAddress = remoteAddress; - this.partitionId = partitionId; - fullQueue = new ArrayDeque<ByteBuffer>(nBuffers); - this.nBuffers = nBuffers; + this.jobId = jobId; + this.fileId = fileId; + this.fullQueue = new ArrayDeque<>(NUM_READ_BUFFERS); + this.nBuffers = NUM_READ_BUFFERS; } @Override @@ -98,18 +90,16 @@ public class NetworkInputChannel implements IInputChannel { throw HyracksDataException.create(e); } ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor()); - ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor()); + ccb.getWriteInterface().setEmptyBufferAcceptor(WriteEmptyBufferAcceptor.INSTANCE); ccb.getReadInterface().setBufferFactory(new ReadBufferFactory(nBuffers, ctx), nBuffers, ctx.getInitialFrameSize()); - ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE); - writeBuffer.putLong(partitionId.getJobId().getId()); - writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId()); - writeBuffer.putInt(partitionId.getSenderIndex()); - writeBuffer.putInt(partitionId.getReceiverIndex()); + + ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkInputChannel.INITIAL_MESSAGE_SIZE); + writeBuffer.putLong(FILE_CHANNEL_CODE); + writeBuffer.putLong(jobId); + writeBuffer.putLong(fileId); writeBuffer.flip(); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Sending partition request: " + partitionId + " on channel: " + ccb); - } + ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer); ccb.getWriteInterface().getFullBufferAcceptor().close(); } @@ -128,21 +118,24 @@ public class NetworkInputChannel implements IInputChannel { @Override public void accept(ByteBuffer buffer) { fullQueue.add(buffer); - monitor.notifyDataAvailability(NetworkInputChannel.this, 1); + monitor.notifyDataAvailability(FileNetworkInputChannel.this, 1); } @Override public void close() { - monitor.notifyEndOfStream(NetworkInputChannel.this); + monitor.notifyEndOfStream(FileNetworkInputChannel.this); } @Override public void error(int ecode) { - monitor.notifyFailure(NetworkInputChannel.this, ecode); + monitor.notifyFailure(FileNetworkInputChannel.this, ecode); } } - private class WriteEmptyBufferAcceptor implements IBufferAcceptor { + private static class WriteEmptyBufferAcceptor implements IBufferAcceptor { + + static final WriteEmptyBufferAcceptor INSTANCE = new WriteEmptyBufferAcceptor(); + @Override public void accept(ByteBuffer buffer) { // do nothing diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java index 53bb7cd9ad..5ae81fbe61 100644 --- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java @@ -37,7 +37,7 @@ import org.apache.logging.log4j.Logger; public class NetworkInputChannel implements IInputChannel { private static final Logger LOGGER = LogManager.getLogger(); - static final int INITIAL_MESSAGE_SIZE = 20; + public static final int INITIAL_MESSAGE_SIZE = 24; private final IChannelConnectionFactory netManager; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java index f179c36fbe..a96dfe5662 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java @@ -108,6 +108,7 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { private final String jobStartTimeZoneId; private final long maxWarnings; + private final AtomicLong uniqueIds; public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId jobId, INCServiceContext serviceCtx, ActivityClusterGraph acg, @@ -140,6 +141,7 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { this.jobStartTime = jobStartTime; this.jobStartTimeZoneId = jobStartTimeZoneId; this.maxWarnings = acg.getMaxWarnings(); + this.uniqueIds = new AtomicLong(); } @Override @@ -160,6 +162,11 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { return env; } + @Override + public long nextUniqueId() { + return uniqueIds.getAndIncrement(); + } + @Override public long getJobStartTime() { return jobStartTime; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java index 6876618b62..468a96981d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java @@ -31,8 +31,12 @@ import org.apache.hyracks.api.exceptions.NetException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.network.ISocketChannelFactory; import org.apache.hyracks.api.partitions.PartitionId; +import org.apache.hyracks.comm.channels.FileNetworkInputChannel; import org.apache.hyracks.comm.channels.IChannelConnectionFactory; +import org.apache.hyracks.comm.channels.NetworkInputChannel; import org.apache.hyracks.comm.channels.NetworkOutputChannel; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.control.nc.partitions.PartitionFileReaderUtil; import org.apache.hyracks.control.nc.partitions.PartitionManager; import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock; import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener; @@ -47,8 +51,6 @@ public class NetworkManager implements IChannelConnectionFactory { private static final int MAX_CONNECTION_ATTEMPTS = 5; - static final int INITIAL_MESSAGE_SIZE = 20; - private final PartitionManager partitionManager; private final int nBuffers; @@ -113,7 +115,8 @@ public class NetworkManager implements IChannelConnectionFactory { @Override public void channelOpened(ChannelControlBlock channel) { channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel)); - channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE)); + channel.getReadInterface().getEmptyBufferAcceptor() + .accept(ByteBuffer.allocate(NetworkInputChannel.INITIAL_MESSAGE_SIZE)); } } @@ -128,12 +131,13 @@ public class NetworkManager implements IChannelConnectionFactory { @Override public void accept(ByteBuffer buffer) { - PartitionId pid = readInitialMessage(buffer); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Received initial partition request: " + pid + " on channel: " + ccb); - } noc = new NetworkOutputChannel(ccb, nBuffers); - partitionManager.registerPartitionRequest(pid, noc); + long id = buffer.getLong(); + if (id == FileNetworkInputChannel.FILE_CHANNEL_CODE) { + handleFileRequest(buffer); + } else { + handlePartitionRequest(buffer, id); + } } @Override @@ -147,16 +151,37 @@ public class NetworkManager implements IChannelConnectionFactory { noc.abort(ecode); } } + + private void handlePartitionRequest(ByteBuffer buffer, long jid) { + PartitionId pid = readInitialMessage(buffer, jid); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Received initial partition request: " + pid + " on channel: " + ccb); + } + partitionManager.registerPartitionRequest(pid, noc); + } + + private void handleFileRequest(ByteBuffer buffer) { + JobId jobId = new JobId(buffer.getLong()); + long fileId = buffer.getLong(); + if (partitionManager.registerFileRequest(jobId, noc)) { + writeFileToChannel(partitionManager.getNodeControllerService(), noc, jobId, fileId); + } + } } - private static PartitionId readInitialMessage(ByteBuffer buffer) { - JobId jobId = new JobId(buffer.getLong()); + private static PartitionId readInitialMessage(ByteBuffer buffer, long jid) { + JobId jobId = new JobId(jid); ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt()); int senderIndex = buffer.getInt(); int receiverIndex = buffer.getInt(); return new PartitionId(jobId, cdid, senderIndex, receiverIndex); } + private static void writeFileToChannel(NodeControllerService ncs, NetworkOutputChannel noc, JobId jobId, + long fileId) { + PartitionFileReaderUtil.writeFileToChannel(ncs, noc, jobId, fileId); + } + public MuxDemuxPerformanceCounters getPerformanceCounters() { return md.getPerformanceCounters(); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/JobFileState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/JobFileState.java new file mode 100644 index 0000000000..e4e6f092b8 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/JobFileState.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.partitions; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hyracks.api.dataflow.state.IStateObject; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.job.JobId; + +public class JobFileState implements IStateObject { + + private final FileReference fileRef; + private final JobId jobId; + private final JobFileId jobFileId; + + public JobFileState(FileReference fileRef, JobId jobId, long fileId) { + this.fileRef = fileRef; + this.jobFileId = new JobFileId(fileId); + this.jobId = jobId; + } + + @Override + public JobId getJobId() { + return jobId; + } + + @Override + public Object getId() { + return jobFileId; + } + + @Override + public long getMemoryOccupancy() { + return 0; + } + + @Override + public void toBytes(DataOutput out) throws IOException { + + } + + @Override + public void fromBytes(DataInput in) throws IOException { + + } + + public FileReference getFileRef() { + return fileRef; + } + + public static class JobFileId { + + private final long fileId; + + public JobFileId(long fileId) { + this.fileId = fileId; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof JobFileId)) { + return false; + } + JobFileId otherFileId = (JobFileId) o; + return otherFileId.fileId == fileId; + } + + @Override + public int hashCode() { + return Long.hashCode(fileId); + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java index 3e369469d6..218e557a1c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java @@ -18,14 +18,11 @@ */ package org.apache.hyracks.control.nc.partitions; -import java.nio.ByteBuffer; import java.util.concurrent.Executor; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.io.IFileHandle; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.partitions.IPartition; @@ -60,45 +57,7 @@ public class MaterializedPartition implements IPartition { @Override public void writeTo(final IFrameWriter writer) { - executor.execute(new Runnable() { - @Override - public void run() { - try { - if (partitionFile == null) { - writer.open(); - writer.close(); - return; - } - IFileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); - try { - writer.open(); - try { - long offset = 0; - ByteBuffer buffer = ctx.allocateFrame(); - while (true) { - buffer.clear(); - long size = ioManager.syncRead(fh, offset, buffer); - if (size < 0) { - break; - } else if (size < buffer.capacity()) { - throw new HyracksDataException("Premature end of file"); - } - offset += size; - buffer.flip(); - writer.nextFrame(buffer); - } - } finally { - writer.close(); - } - } finally { - ioManager.close(fh); - } - } catch (HyracksDataException e) { - throw new RuntimeException(e); - } - } - }); + executor.execute(new PartitionFileReader(ctx, partitionFile, ioManager, writer)); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java new file mode 100644 index 0000000000..a0d8dcd665 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.partitions; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksCommonContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IFileHandle; +import org.apache.hyracks.api.io.IIOManager; + +public class PartitionFileReader implements Runnable { + + private final IHyracksCommonContext ctx; + private final FileReference partitionFile; + private final IIOManager ioManager; + private final IFrameWriter writer; + + public PartitionFileReader(IHyracksCommonContext ctx, FileReference partitionFile, IIOManager ioManager, + IFrameWriter writer) { + this.ctx = ctx; + this.partitionFile = partitionFile; + this.ioManager = ioManager; + this.writer = writer; + } + + @Override + public void run() { + try { + if (partitionFile == null) { + writer.open(); + writer.close(); + return; + } + IFileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY, + IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + try { + writer.open(); + try { + long offset = 0; + ByteBuffer buffer = ctx.allocateFrame(); + while (true) { + buffer.clear(); + long size = ioManager.syncRead(fh, offset, buffer); + if (size < 0) { + break; + } else if (size < buffer.capacity()) { + throw new HyracksDataException("Premature end of file"); + } + offset += size; + buffer.flip(); + writer.nextFrame(buffer); + } + } finally { + writer.close(); + } + } finally { + ioManager.close(fh); + } + } catch (HyracksDataException e) { + throw new RuntimeException(e); + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java new file mode 100644 index 0000000000..2675c279ee --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.partitions; + +import java.util.concurrent.ExecutorService; + +import org.apache.hyracks.api.dataflow.state.IStateObject; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.comm.channels.NetworkOutputChannel; +import org.apache.hyracks.control.nc.Joblet; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface; + +public class PartitionFileReaderUtil { + + private PartitionFileReaderUtil() { + } + + public static void writeFileToChannel(NodeControllerService ncs, NetworkOutputChannel noc, JobId jobId, + long fileId) { + Joblet joblet = ncs.getJobletMap().get(jobId); + if (joblet == null) { + noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE); + return; + } + IStateObject stateObject = joblet.getEnvironment().getStateObject(new JobFileState.JobFileId(fileId)); + if (!(stateObject instanceof JobFileState)) { + noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE); + return; + } + JobFileState fileState = (JobFileState) stateObject; + FileReference fileRef = fileState.getFileRef(); + if (!fileRef.getFile().exists()) { + noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE); + return; + } + ExecutorService executor = ncs.getExecutor(); + noc.setFrameSize(joblet.getInitialFrameSize()); + executor.execute(new PartitionFileReader(joblet, fileRef, ncs.getIoManager(), noc)); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java index c082b71fd2..927dd7c126 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java @@ -58,6 +58,8 @@ public class PartitionManager { private final Map<PartitionId, NetworkOutputChannel> partitionRequests = new HashMap<>(); + private final Map<JobId, List<NetworkOutputChannel>> fileRequests = new HashMap<>(); + private final Cache<JobId, JobId> failedJobsCache; public PartitionManager(NodeControllerService ncs) { @@ -122,10 +124,24 @@ public class PartitionManager { } } + public synchronized boolean registerFileRequest(JobId jid, NetworkOutputChannel noc) { + if (failedJobsCache.getIfPresent(jid) != null) { + noc.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE); + return false; + } + List<NetworkOutputChannel> netOutChannels = fileRequests.computeIfAbsent(jid, k -> new ArrayList<>()); + netOutChannels.add(noc); + return true; + } + public IWorkspaceFileFactory getFileFactory() { return fileFactory; } + public NodeControllerService getNodeControllerService() { + return ncs; + } + public void close() { deallocatableRegistry.close(); } @@ -175,6 +191,10 @@ public class PartitionManager { } private List<NetworkOutputChannel> removePendingRequests(JobId jobId, JobStatus status) { + List<NetworkOutputChannel> jobFileRequests = null; + if (!fileRequests.isEmpty()) { + jobFileRequests = fileRequests.remove(jobId); + } if (status != JobStatus.FAILURE) { return Collections.emptyList(); } @@ -189,6 +209,9 @@ public class PartitionManager { requestsIterator.remove(); } } + if (jobFileRequests != null && !jobFileRequests.isEmpty()) { + pendingRequests.addAll(jobFileRequests); + } return pendingRequests; } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java index 50cacb260f..7051e1d921 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java @@ -115,6 +115,10 @@ public class RunFileReader implements IFrameReader { return size; } + public FileReference getFile() { + return file; + } + public void setDeleteAfterClose(boolean deleteAfterClose) { this.deleteAfterClose = deleteAfterClose; } diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java index 46e3eec315..bf4cbd0942 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java @@ -20,6 +20,7 @@ package org.apache.hyracks.test.support; import java.nio.ByteBuffer; import java.time.ZoneId; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.context.IHyracksJobletContext; @@ -42,6 +43,7 @@ public class TestJobletContext implements IHyracksJobletContext { private final WorkspaceFileFactory fileFactory; private final long jobStartTime; private final String jobStartTimeZoneId; + private final AtomicLong ids; TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException { this.serviceContext = serviceContext; @@ -50,6 +52,7 @@ public class TestJobletContext implements IHyracksJobletContext { this.frameManger = new FrameManager(frameSize); this.jobStartTime = System.currentTimeMillis(); this.jobStartTimeZoneId = ZoneId.systemDefault().getId(); + this.ids = new AtomicLong(); } @Override @@ -151,4 +154,8 @@ public class TestJobletContext implements IHyracksJobletContext { return this.getClass().getClassLoader(); } + @Override + public long nextUniqueId() { + return ids.getAndIncrement(); + } }
