Repository: asterixdb Updated Branches: refs/heads/master a993e9bf8 -> 5af85d9ed
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlSumAggregateDescriptor.java new file mode 100644 index 0000000..9134c81 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlSumAggregateDescriptor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class GlobalSqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = GlobalSqlSumAggregateDescriptor::new; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.GLOBAL_SQL_SUM; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new GlobalSqlSumAggregateFunction(args, ctx, sourceLoc); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlSumAggregateFunction.java new file mode 100644 index 0000000..bbff30a --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlSumAggregateFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.IOException; + +public class GlobalSqlSumAggregateFunction extends AbstractSumAggregateFunction { + + public GlobalSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + super.step(tuple); + } + + // Finish calculation + @Override + public void finish(IPointable result) throws HyracksDataException { + super.finish(result); + } + + // Is skip + @Override + protected boolean skipStep() { + return false; + } + + // Handle NULL step + @Override + protected void processNull() { + // Do nothing + } + + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() { + // Do nothing + } + + // Handle NULL finish + @Override + protected void finishNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + result.set(resultStorage); + } + + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + result.set(resultStorage); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSumAggregateDescriptor.java new file mode 100644 index 0000000..930821f --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSumAggregateDescriptor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class GlobalSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = GlobalSumAggregateDescriptor::new; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.GLOBAL_SUM; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new GlobalSumAggregateFunction(args, ctx, sourceLoc); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSumAggregateFunction.java new file mode 100644 index 0000000..9aae327 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSumAggregateFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.IOException; + +public class GlobalSumAggregateFunction extends AbstractSumAggregateFunction { + + public GlobalSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + super.step(tuple); + } + + // Finish calculation + @Override + public void finish(IPointable result) throws HyracksDataException { + super.finish(result); + } + + // Is skip + @Override + protected boolean skipStep() { + return aggType == ATypeTag.NULL; + } + + // Handle NULL step + @Override + protected void processNull() { + aggType = ATypeTag.NULL; + } + + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() { + // Do nothing + } + + // Handle NULL finish + @Override + protected void finishNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + result.set(resultStorage); + } + + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + result.set(resultStorage); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlSumAggregateDescriptor.java new file mode 100644 index 0000000..43c1f1a --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlSumAggregateDescriptor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class IntermediateSqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = IntermediateSqlSumAggregateDescriptor::new; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.INTERMEDIATE_SQL_SUM; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new IntermediateSqlSumAggregateFunction(args, ctx, sourceLoc); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlSumAggregateFunction.java new file mode 100644 index 0000000..e266516 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlSumAggregateFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.IOException; + +public class IntermediateSqlSumAggregateFunction extends AbstractSumAggregateFunction { + + public IntermediateSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + super.step(tuple); + } + + // Finish calculation + @Override + public void finish(IPointable result) throws HyracksDataException { + super.finishPartial(result); + } + + // Is skip + @Override + protected boolean skipStep() { + return false; + } + + // Handle NULL step + @Override + protected void processNull() { + // Do nothing + } + + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() { + // Do nothing + } + + // Handle NULL finish + @Override + protected void finishNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + result.set(resultStorage); + } + + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + result.set(resultStorage); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSumAggregateDescriptor.java new file mode 100644 index 0000000..7c76905 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSumAggregateDescriptor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class IntermediateSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = IntermediateSumAggregateDescriptor::new; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.INTERMEDIATE_SUM; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new IntermediateSumAggregateFunction(args, ctx, sourceLoc); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSumAggregateFunction.java new file mode 100644 index 0000000..2cec29d --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSumAggregateFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.IOException; + +public class IntermediateSumAggregateFunction extends AbstractSumAggregateFunction { + + public IntermediateSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + super.step(tuple); + } + + // Finish calculation + @Override + public void finish(IPointable result) throws HyracksDataException { + super.finishPartial(result); + } + + // Is skip + @Override + protected boolean skipStep() { + return aggType == ATypeTag.NULL; + } + + // Handle NULL step + @Override + protected void processNull() { + aggType = ATypeTag.NULL; + } + + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() { + // Do nothing + } + + // Handle NULL finish + @Override + protected void finishNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + result.set(resultStorage); + } + + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + result.set(resultStorage); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java index 1262399..184f163 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java @@ -19,7 +19,6 @@ package org.apache.asterix.runtime.aggregates.std; import org.apache.asterix.om.functions.BuiltinFunctions; -import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.functions.IFunctionDescriptorFactory; import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; @@ -32,17 +31,11 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class LocalSqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; - private final static FunctionIdentifier FID = BuiltinFunctions.LOCAL_SQL_SUM; - public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { - @Override - public IFunctionDescriptor createFunctionDescriptor() { - return new LocalSqlSumAggregateDescriptor(); - } - }; + public static final IFunctionDescriptorFactory FACTORY = LocalSqlSumAggregateDescriptor::new; @Override public FunctionIdentifier getIdentifier() { - return FID; + return BuiltinFunctions.LOCAL_SQL_SUM; } @Override @@ -53,7 +46,7 @@ public class LocalSqlSumAggregateDescriptor extends AbstractAggregateFunctionDyn @Override public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { - return new SqlSumAggregateFunction(args, ctx, true, sourceLoc); + return new LocalSqlSumAggregateFunction(args, ctx, sourceLoc); } }; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlSumAggregateFunction.java new file mode 100644 index 0000000..df19968 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlSumAggregateFunction.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.IOException; + +public class LocalSqlSumAggregateFunction extends AbstractSumAggregateFunction { + + public LocalSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + super.step(tuple); + } + + // Finish calculation + @Override + public void finish(IPointable result) throws HyracksDataException { + super.finishPartial(result); + } + + // Is skip + @Override + protected boolean skipStep() { + return false; + } + + // Handle NULL step + @Override + protected void processNull() { + // Do nothing + } + + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() throws HyracksDataException { + throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SQL_SUM, + ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + } + + // Handle NULL finish + @Override + protected void finishNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + result.set(resultStorage); + } + + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + result.set(resultStorage); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java index 0e11541..2fafe32 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java @@ -19,7 +19,6 @@ package org.apache.asterix.runtime.aggregates.std; import org.apache.asterix.om.functions.BuiltinFunctions; -import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.functions.IFunctionDescriptorFactory; import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; @@ -32,17 +31,11 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class LocalSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; - private final static FunctionIdentifier FID = BuiltinFunctions.LOCAL_SUM; - public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { - @Override - public IFunctionDescriptor createFunctionDescriptor() { - return new LocalSumAggregateDescriptor(); - } - }; + public static final IFunctionDescriptorFactory FACTORY = LocalSumAggregateDescriptor::new; @Override public FunctionIdentifier getIdentifier() { - return FID; + return BuiltinFunctions.LOCAL_SUM; } @Override @@ -53,7 +46,7 @@ public class LocalSumAggregateDescriptor extends AbstractAggregateFunctionDynami @Override public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { - return new SumAggregateFunction(args, ctx, true, sourceLoc); + return new LocalSumAggregateFunction(args, ctx, sourceLoc); } }; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSumAggregateFunction.java new file mode 100644 index 0000000..0c3a692 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSumAggregateFunction.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.IOException; + +public class LocalSumAggregateFunction extends AbstractSumAggregateFunction { + + public LocalSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + super.step(tuple); + } + + // Finish calculation + @Override + public void finish(IPointable result) throws HyracksDataException { + super.finishPartial(result); + } + + // Is skip + @Override + protected boolean skipStep() { + return aggType == ATypeTag.NULL; + } + + // Handle NULL step + @Override + protected void processNull() { + aggType = ATypeTag.NULL; + } + + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() throws HyracksDataException { + throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SQL_SUM, + ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + } + + // Handle NULL finish + @Override + protected void finishNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + result.set(resultStorage); + } + + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + result.set(resultStorage); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java index b7fb755..6e73a40 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java @@ -19,7 +19,6 @@ package org.apache.asterix.runtime.aggregates.std; import org.apache.asterix.om.functions.BuiltinFunctions; -import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.functions.IFunctionDescriptorFactory; import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; @@ -32,12 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class SqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; - public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { - @Override - public IFunctionDescriptor createFunctionDescriptor() { - return new SqlSumAggregateDescriptor(); - } - }; + public static final IFunctionDescriptorFactory FACTORY = SqlSumAggregateDescriptor::new; @Override public FunctionIdentifier getIdentifier() { @@ -52,7 +46,7 @@ public class SqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicD @Override public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { - return new SqlSumAggregateFunction(args, ctx, false, sourceLoc); + return new SqlSumAggregateFunction(args, ctx, sourceLoc); } }; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java index 8b4bf7c..0efa4f1 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java @@ -18,52 +18,64 @@ */ package org.apache.asterix.runtime.aggregates.std; -import java.io.IOException; - -import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; -import org.apache.asterix.om.base.ANull; -import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.IOException; public class SqlSumAggregateFunction extends AbstractSumAggregateFunction { - private final boolean isLocalAgg; - public SqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, boolean isLocalAgg, + public SqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, SourceLocation sourceLoc) throws HyracksDataException { super(args, context, sourceLoc); - this.isLocalAgg = isLocalAgg; } + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + super.step(tuple); + } + + // Finish calculation + @Override + public void finish(IPointable result) throws HyracksDataException { + super.finish(result); + } + + // Is skip + @Override + protected boolean skipStep() { + return false; + } + + // Handle NULL step @Override protected void processNull() { + // Do nothing + } + + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() { + // Do nothing } + // Handle NULL finish @Override - protected void processSystemNull() throws HyracksDataException { - // For global aggregates simply ignore system null here, - // but if all input value are system null, then we should return - // null in finish(). - if (isLocalAgg) { - throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SQL_SUM, - ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); - } + protected void finishNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + result.set(resultStorage); } - @SuppressWarnings("unchecked") + // Handle SYSTEM_NULL finish @Override - protected void finishSystemNull() throws IOException { - // Empty stream. For local agg return system null. For global agg return null. - if (isLocalAgg) { - resultStorage.getDataOutput().writeByte(ATypeTag.SYSTEM_NULL.serialize()); - } else { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL); - serde.serialize(ANull.NULL, resultStorage.getDataOutput()); - } + protected void finishSystemNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + result.set(resultStorage); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SumAggregateDescriptor.java index df429b0..1c9623e 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SumAggregateDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SumAggregateDescriptor.java @@ -19,7 +19,6 @@ package org.apache.asterix.runtime.aggregates.std; import org.apache.asterix.om.functions.BuiltinFunctions; -import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.functions.IFunctionDescriptorFactory; import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; @@ -32,12 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class SumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; - public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { - @Override - public IFunctionDescriptor createFunctionDescriptor() { - return new SumAggregateDescriptor(); - } - }; + public static final IFunctionDescriptorFactory FACTORY = SumAggregateDescriptor::new; @Override public FunctionIdentifier getIdentifier() { @@ -52,7 +46,7 @@ public class SumAggregateDescriptor extends AbstractAggregateFunctionDynamicDesc @Override public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { - return new SumAggregateFunction(args, ctx, false, sourceLoc); + return new SumAggregateFunction(args, ctx, sourceLoc); } }; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SumAggregateFunction.java index abcac77..51a5e23 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SumAggregateFunction.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SumAggregateFunction.java @@ -18,58 +18,64 @@ */ package org.apache.asterix.runtime.aggregates.std; -import java.io.IOException; - -import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; -import org.apache.asterix.om.base.ANull; -import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.IOException; public class SumAggregateFunction extends AbstractSumAggregateFunction { - private final boolean isLocalAgg; - public SumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, boolean isLocalAgg, - SourceLocation sourceLoc) throws HyracksDataException { + public SumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, SourceLocation sourceLoc) + throws HyracksDataException { super(args, context, sourceLoc); - this.isLocalAgg = isLocalAgg; } + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + super.step(tuple); + } + + // Finish calculation + @Override + public void finish(IPointable result) throws HyracksDataException { + super.finish(result); + } + + // Is skip @Override protected boolean skipStep() { - return (aggType == ATypeTag.NULL); + return aggType == ATypeTag.NULL; } + // Handle NULL step @Override protected void processNull() { aggType = ATypeTag.NULL; } + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() { + // Do nothing + } + + // Handle NULL finish @Override - protected void processSystemNull() throws HyracksDataException { - // For global aggregates simply ignore system null here, - // but if all input value are system null, then we should return - // null in finish(). - if (isLocalAgg) { - throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, - ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); - } + protected void finishNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + result.set(resultStorage); } - @SuppressWarnings("unchecked") + // Handle SYSTEM_NULL finish @Override - protected void finishSystemNull() throws IOException { - // Empty stream. For local agg return system null. For global agg return null. - if (isLocalAgg) { - resultStorage.getDataOutput().writeByte(ATypeTag.SYSTEM_NULL.serialize()); - } else { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL); - serde.serialize(ANull.NULL, resultStorage.getDataOutput()); - } + protected void finishSystemNull(IPointable result) throws IOException { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + result.set(resultStorage); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java index 1b0c7b6..dd1d862 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java @@ -55,20 +55,24 @@ import org.apache.asterix.runtime.aggregates.serializable.std.SerializableGlobal import org.apache.asterix.runtime.aggregates.serializable.std.SerializableGlobalSqlAvgAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableGlobalSqlStddevAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableGlobalSqlStddevPopAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.serializable.std.SerializableGlobalSqlSumAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableGlobalStddevAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableGlobalStddevPopAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableGlobalSqlVarAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableGlobalSqlVarPopAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.serializable.std.SerializableGlobalSumAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableGlobalVarAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableGlobalVarPopAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableIntermediateAvgAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableIntermediateSqlAvgAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableIntermediateSqlStddevAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableIntermediateSqlStddevPopAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.serializable.std.SerializableIntermediateSqlSumAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableIntermediateStddevAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableIntermediateStddevPopAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableIntermediateSqlVarAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableIntermediateSqlVarPopAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.serializable.std.SerializableIntermediateSumAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableIntermediateVarAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableIntermediateVarPopAggregateDescriptor; import org.apache.asterix.runtime.aggregates.serializable.std.SerializableLocalAvgAggregateDescriptor; @@ -98,6 +102,10 @@ import org.apache.asterix.runtime.aggregates.serializable.std.SerializableVarPop import org.apache.asterix.runtime.aggregates.std.AvgAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.CountAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.GlobalAvgAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.std.GlobalSqlSumAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.std.GlobalSumAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.std.IntermediateSqlSumAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.std.IntermediateSumAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.RangeMapAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.GlobalSqlAvgAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.GlobalSqlStddevAggregateDescriptor; @@ -509,6 +517,8 @@ public final class FunctionCollection implements IFunctionCollection { fc.add(GlobalAvgAggregateDescriptor.FACTORY); fc.add(SumAggregateDescriptor.FACTORY); fc.add(LocalSumAggregateDescriptor.FACTORY); + fc.add(IntermediateSumAggregateDescriptor.FACTORY); + fc.add(GlobalSumAggregateDescriptor.FACTORY); fc.add(MaxAggregateDescriptor.FACTORY); fc.add(LocalMaxAggregateDescriptor.FACTORY); fc.add(MinAggregateDescriptor.FACTORY); @@ -542,6 +552,8 @@ public final class FunctionCollection implements IFunctionCollection { fc.add(SerializableGlobalAvgAggregateDescriptor.FACTORY); fc.add(SerializableSumAggregateDescriptor.FACTORY); fc.add(SerializableLocalSumAggregateDescriptor.FACTORY); + fc.add(SerializableIntermediateSumAggregateDescriptor.FACTORY); + fc.add(SerializableGlobalSumAggregateDescriptor.FACTORY); fc.add(SerializableStddevAggregateDescriptor.FACTORY); fc.add(SerializableLocalStddevAggregateDescriptor.FACTORY); fc.add(SerializableIntermediateStddevAggregateDescriptor.FACTORY); @@ -580,6 +592,8 @@ public final class FunctionCollection implements IFunctionCollection { fc.add(GlobalSqlAvgAggregateDescriptor.FACTORY); fc.add(SqlSumAggregateDescriptor.FACTORY); fc.add(LocalSqlSumAggregateDescriptor.FACTORY); + fc.add(IntermediateSqlSumAggregateDescriptor.FACTORY); + fc.add(GlobalSqlSumAggregateDescriptor.FACTORY); fc.add(SqlMaxAggregateDescriptor.FACTORY); fc.add(LocalSqlMaxAggregateDescriptor.FACTORY); fc.add(SqlMinAggregateDescriptor.FACTORY); @@ -609,6 +623,8 @@ public final class FunctionCollection implements IFunctionCollection { fc.add(SerializableGlobalSqlAvgAggregateDescriptor.FACTORY); fc.add(SerializableSqlSumAggregateDescriptor.FACTORY); fc.add(SerializableLocalSqlSumAggregateDescriptor.FACTORY); + fc.add(SerializableIntermediateSqlSumAggregateDescriptor.FACTORY); + fc.add(SerializableGlobalSqlSumAggregateDescriptor.FACTORY); fc.add(SerializableSqlStddevAggregateDescriptor.FACTORY); fc.add(SerializableLocalSqlStddevAggregateDescriptor.FACTORY); fc.add(SerializableIntermediateSqlStddevAggregateDescriptor.FACTORY);
