http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java index 132fd54..3c07b47 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java @@ -18,30 +18,18 @@ */ package org.apache.asterix.runtime.aggregates.serializable.std; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer; -import org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer; -import org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer; -import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer; -import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer; -import org.apache.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer; +import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.dataflow.data.nontagged.serde.*; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.om.base.AMutableDouble; -import org.apache.asterix.om.base.AMutableFloat; -import org.apache.asterix.om.base.AMutableInt16; -import org.apache.asterix.om.base.AMutableInt32; import org.apache.asterix.om.base.AMutableInt64; -import org.apache.asterix.om.base.AMutableInt8; -import org.apache.asterix.om.base.ANull; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.EnumDeserializer; -import org.apache.asterix.om.types.hierachy.ATypeHierarchy; -import org.apache.asterix.runtime.exceptions.IncompatibleTypeException; +import org.apache.asterix.runtime.exceptions.OverflowException; import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -52,187 +40,229 @@ import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +import java.io.DataOutput; +import java.io.IOException; + public abstract class AbstractSerializableSumAggregateFunction extends AbstractSerializableAggregateFunction { + + // Handles evaluating and storing/passing serialized data protected static final int AGG_TYPE_OFFSET = 0; private static final int SUM_OFFSET = 1; - private IPointable inputVal = new VoidPointable(); private IScalarEvaluator eval; - private AMutableDouble aDouble = new AMutableDouble(0); - private AMutableFloat aFloat = new AMutableFloat(0); + + // Aggregate type + protected ATypeTag aggType; + + // Result holders private AMutableInt64 aInt64 = new AMutableInt64(0); - private AMutableInt32 aInt32 = new AMutableInt32(0); - private AMutableInt16 aInt16 = new AMutableInt16((short) 0); - private AMutableInt8 aInt8 = new AMutableInt8((byte) 0); + private AMutableDouble aDouble = new AMutableDouble(0); + + // Flags for output type (If all output flags are false, double output is used) + private boolean isUseInt64ForResult = true; + + // Serializer/Deserializer + @SuppressWarnings("rawtypes") + private ISerializerDeserializer aInt64Serde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); @SuppressWarnings("rawtypes") - public ISerializerDeserializer serde; + private ISerializerDeserializer aDoubleSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE); + // Constructor public AbstractSerializableSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, SourceLocation sourceLoc) throws HyracksDataException { super(sourceLoc); eval = args[0].createScalarEvaluator(context); } + // Abstract methods + protected abstract boolean skipStep(byte[] state, int start); // Skip step + + protected abstract void processNull(byte[] state, int start); // Handle NULL step + + protected abstract void processSystemNull() throws HyracksDataException; // Handle SYSTEM_NULL step + + protected abstract void finishNull(DataOutput out) throws IOException; // Handle NULL finish + + protected abstract void finishSystemNull(DataOutput out) throws IOException; // Handle SYSTEM_NULL finish + + // Init the values @Override public void init(DataOutput state) throws HyracksDataException { try { state.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); - state.writeDouble(0.0); + state.writeLong(0); } catch (IOException e) { throw HyracksDataException.create(e); } } + // Called for each incoming tuple @Override public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException { + // Skip current step if (skipStep(state, start)) { return; } - ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]); - double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET); + + // Evaluate/Get the data from the tuple eval.evaluate(tuple, inputVal); byte[] bytes = inputVal.getByteArray(); int offset = inputVal.getStartOffset(); + // Get the data type tag ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes[offset]); + + // Handle MISSING and NULL values if (typeTag == ATypeTag.MISSING || typeTag == ATypeTag.NULL) { processNull(state, start); return; - } else if (aggType == ATypeTag.SYSTEM_NULL) { - aggType = typeTag; - } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) { - if (typeTag.ordinal() > aggType.ordinal()) { - throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, bytes[offset], - aggType.serialize()); - } else { - throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize(), - bytes[offset]); - } - } - - if (ATypeHierarchy.canPromote(aggType, typeTag)) { - aggType = typeTag; } + // Calculate based on the incoming data type + handles invalid data type switch (typeTag) { case TINYINT: { byte val = AInt8SerializerDeserializer.getByte(bytes, offset + 1); - sum += val; + processInt64Value(state, start, val); break; } case SMALLINT: { short val = AInt16SerializerDeserializer.getShort(bytes, offset + 1); - sum += val; + processInt64Value(state, start, val); break; } case INTEGER: { int val = AInt32SerializerDeserializer.getInt(bytes, offset + 1); - sum += val; + processInt64Value(state, start, val); break; } case BIGINT: { long val = AInt64SerializerDeserializer.getLong(bytes, offset + 1); - sum += val; + processInt64Value(state, start, val); break; } case FLOAT: { + upgradeOutputType(); float val = AFloatSerializerDeserializer.getFloat(bytes, offset + 1); - sum += val; + processFloatValue(state, start, val); break; } case DOUBLE: { + upgradeOutputType(); double val = ADoubleSerializerDeserializer.getDouble(bytes, offset + 1); - sum += val; + processFloatValue(state, start, val); break; } - case NULL: { - aggType = typeTag; - break; - } - case SYSTEM_NULL: { + case SYSTEM_NULL: processSystemNull(); break; - } default: throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, bytes[offset]); } - state[start + AGG_TYPE_OFFSET] = aggType.serialize(); + } + + // Upgrade the output type + private void upgradeOutputType() { + isUseInt64ForResult = false; + } + + // Process int64 value + private void processInt64Value(byte[] state, int start, long value) throws HyracksDataException { + // Check the output flag first + if (!isUseInt64ForResult) { + processFloatValue(state, start, value); + } + + // Int64 output, watch out for overflow exception + else { + try { + // Current total + long sum = BufferSerDeUtil.getLong(state, start + SUM_OFFSET); + sum = Math.addExact(sum, value); + + // Write the output + state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_INT64_TYPE_TAG; + BufferSerDeUtil.writeLong(sum, state, start + SUM_OFFSET); + } catch (ArithmeticException ignored) { + throw new OverflowException(sourceLoc, getIdentifier()); + } + } + } + + // Process float value + private void processFloatValue(byte[] state, int start, double value) { + double sum; + aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]); + + // This checks if the previous written value is bigint, SYSTEM_NULL or double and reads it accordingly + // Important: SYSTEM_NULL reads int64 because the written value is in64 as well, check the init() method + if (aggType == ATypeTag.BIGINT || aggType == ATypeTag.SYSTEM_NULL) { + // Last write was a bigint or SYSTEM_NULL + sum = BufferSerDeUtil.getLong(state, start + SUM_OFFSET); + } else { + // Last write was a double + sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET); + } + + // Add the value + sum += value; + + // Write the output + state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG; BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET); } @SuppressWarnings("unchecked") @Override + public void finishPartial(byte[] state, int start, int len, DataOutput out) throws HyracksDataException { + // finishPartial() has identical behavior to finish() + finishFinal(state, start, len, out); + } + + @SuppressWarnings("unchecked") + @Override public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException { - ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]); - double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET); + // Finish + finishFinal(state, start, len, out); + } + + @SuppressWarnings({ "unchecked", "unused" }) + private void finishFinal(byte[] state, int start, int len, DataOutput out) throws HyracksDataException { + aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]); + try { - switch (aggType) { - case TINYINT: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8); - aInt8.setValue((byte) sum); - serde.serialize(aInt8, out); - break; + // aggType is SYSTEM_NULL (ran over zero values) + if (aggType == ATypeTag.SYSTEM_NULL) { + if (GlobalConfig.DEBUG) { + GlobalConfig.ASTERIX_LOGGER.trace("SUM aggregate ran over zero values."); } - case SMALLINT: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16); - aInt16.setValue((short) sum); - serde.serialize(aInt16, out); - break; - } - case INTEGER: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); - aInt32.setValue((int) sum); - serde.serialize(aInt32, out); - break; - } - case BIGINT: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); - aInt64.setValue((long) sum); - serde.serialize(aInt64, out); - break; - } - case FLOAT: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT); - aFloat.setValue((float) sum); - serde.serialize(aFloat, out); - break; - } - case DOUBLE: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE); + + finishSystemNull(out); + } + // aggType is NULL + else if (aggType == ATypeTag.NULL) { + finishNull(out); + } + // Pass the result + else { + if (isUseInt64ForResult) { + long sum = BufferSerDeUtil.getLong(state, start + SUM_OFFSET); + aInt64.setValue(sum); + aInt64Serde.serialize(aInt64, out); + } else { + double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET); aDouble.setValue(sum); - serde.serialize(aDouble, out); - break; - } - case NULL: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL); - serde.serialize(ANull.NULL, out); - break; - } - case SYSTEM_NULL: { - finishSystemNull(out); - break; + aDoubleSerde.serialize(aDouble, out); } - default: - throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize()); } } catch (IOException e) { throw HyracksDataException.create(e); } } - @Override - public void finishPartial(byte[] state, int start, int len, DataOutput out) throws HyracksDataException { - finish(state, start, len, out); - } - - protected boolean skipStep(byte[] state, int start) { - return false; + // Function identifier + private FunctionIdentifier getIdentifier() { + return BuiltinFunctions.SUM; } - - protected abstract void processNull(byte[] state, int start); - - protected abstract void processSystemNull() throws HyracksDataException; - - protected abstract void finishSystemNull(DataOutput out) throws IOException; - }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateDescriptor.java new file mode 100644 index 0000000..3e74308 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateDescriptor.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.serializable.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class SerializableGlobalSqlSumAggregateDescriptor + extends AbstractSerializableAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = SerializableGlobalSqlSumAggregateDescriptor::new; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.SERIAL_GLOBAL_SQL_SUM; + } + + @Override + public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory( + final IScalarEvaluatorFactory[] args) { + return new ISerializedAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) + throws HyracksDataException { + return new SerializableGlobalSqlSumAggregateFunction(args, ctx, sourceLoc); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateFunction.java new file mode 100644 index 0000000..2d8fa61 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateFunction.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.serializable.std; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.DataOutput; +import java.io.IOException; + +public class SerializableGlobalSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction { + + public SerializableGlobalSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException { + super.step(tuple, state, start, len); + } + + // Finish calculation + @Override + public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException { + super.finish(state, start, len, out); + } + + // Is skip + @Override + protected boolean skipStep(byte[] state, int start) { + return false; + } + + // Handle NULL step + @Override + protected void processNull(byte[] state, int start) { + // Do nothing + } + + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() { + // Do nothing + } + + // Handle NULL finish + @Override + protected void finishNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + } + + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateDescriptor.java new file mode 100644 index 0000000..24bfcc9 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateDescriptor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.serializable.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class SerializableGlobalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = SerializableGlobalSumAggregateDescriptor::new; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.SERIAL_GLOBAL_SUM; + } + + @Override + public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory( + final IScalarEvaluatorFactory[] args) { + return new ISerializedAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) + throws HyracksDataException { + return new SerializableGlobalSumAggregateFunction(args, ctx, sourceLoc); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateFunction.java new file mode 100644 index 0000000..35e2f89 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.serializable.std; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.EnumDeserializer; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.DataOutput; +import java.io.IOException; + +public class SerializableGlobalSumAggregateFunction extends AbstractSerializableSumAggregateFunction { + + public SerializableGlobalSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException { + super.step(tuple, state, start, len); + } + + // Finish calculation + @Override + public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException { + super.finish(state, start, len, out); + } + + // Is skip + @Override + protected boolean skipStep(byte[] state, int start) { + ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]); + return aggType == ATypeTag.NULL; + } + + // Handle NULL step + @Override + protected void processNull(byte[] state, int start) { + state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG; + } + + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() { + // Do nothing + } + + // Handle NULL finish + @Override + protected void finishNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + } + + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateDescriptor.java new file mode 100644 index 0000000..8691756 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateDescriptor.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.serializable.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class SerializableIntermediateSqlSumAggregateDescriptor + extends AbstractSerializableAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = SerializableIntermediateSqlSumAggregateDescriptor::new; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.SERIAL_INTERMEDIATE_SQL_SUM; + } + + @Override + public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory( + final IScalarEvaluatorFactory[] args) { + return new ISerializedAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) + throws HyracksDataException { + return new SerializableIntermediateSqlSumAggregateFunction(args, ctx, sourceLoc); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateFunction.java new file mode 100644 index 0000000..04eeed3 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateFunction.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.serializable.std; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.DataOutput; +import java.io.IOException; + +public class SerializableIntermediateSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction { + + public SerializableIntermediateSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException { + super.step(tuple, state, start, len); + } + + // Finish calculation + @Override + public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException { + super.finishPartial(state, start, len, out); + } + + // Is skip + @Override + protected boolean skipStep(byte[] state, int start) { + return false; + } + + // Handle NULL step + @Override + protected void processNull(byte[] state, int start) { + // Do nothing + } + + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() { + // Do nothing + } + + // Handle NULL finish + @Override + protected void finishNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + } + + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateDescriptor.java new file mode 100644 index 0000000..d1d2c0f --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateDescriptor.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.serializable.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class SerializableIntermediateSumAggregateDescriptor + extends AbstractSerializableAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = SerializableIntermediateSumAggregateDescriptor::new; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.SERIAL_INTERMEDIATE_SUM; + } + + @Override + public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory( + final IScalarEvaluatorFactory[] args) { + return new ISerializedAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) + throws HyracksDataException { + return new SerializableIntermediateSumAggregateFunction(args, ctx, sourceLoc); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateFunction.java new file mode 100644 index 0000000..4adc845 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.serializable.std; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.EnumDeserializer; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.DataOutput; +import java.io.IOException; + +public class SerializableIntermediateSumAggregateFunction extends AbstractSerializableSumAggregateFunction { + + public SerializableIntermediateSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException { + super.step(tuple, state, start, len); + } + + // Finish calculation + @Override + public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException { + super.finishPartial(state, start, len, out); + } + + // Is skip + @Override + protected boolean skipStep(byte[] state, int start) { + ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]); + return aggType == ATypeTag.NULL; + } + + // Handle NULL step + @Override + protected void processNull(byte[] state, int start) { + state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG; + } + + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() { + // Do nothing + } + + // Handle NULL finish + @Override + protected void finishNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + } + + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java index 3798b49..79b31c4 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java @@ -19,7 +19,6 @@ package org.apache.asterix.runtime.aggregates.serializable.std; import org.apache.asterix.om.functions.BuiltinFunctions; -import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.functions.IFunctionDescriptorFactory; import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; @@ -32,12 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; - public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { - @Override - public IFunctionDescriptor createFunctionDescriptor() { - return new SerializableLocalSqlSumAggregateDescriptor(); - } - }; + public static final IFunctionDescriptorFactory FACTORY = SerializableLocalSqlSumAggregateDescriptor::new; @Override public FunctionIdentifier getIdentifier() { @@ -53,7 +47,7 @@ public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializ @Override public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws HyracksDataException { - return new SerializableSqlSumAggregateFunction(args, true, ctx, sourceLoc); + return new SerializableLocalSqlSumAggregateFunction(args, ctx, sourceLoc); } }; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateFunction.java new file mode 100644 index 0000000..9aea883 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateFunction.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.serializable.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.DataOutput; +import java.io.IOException; + +public class SerializableLocalSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction { + + public SerializableLocalSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException { + super.step(tuple, state, start, len); + } + + // Finish calculation + @Override + public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException { + super.finishPartial(state, start, len, out); + } + + // Is skip + @Override + protected boolean skipStep(byte[] state, int start) { + return false; + } + + // Handle NULL step + @Override + protected void processNull(byte[] state, int start) { + // Do nothing + } + + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() throws HyracksDataException { + throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, + ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + } + + // Handle NULL finish + @Override + protected void finishNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + } + + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java index 54d24f7..512d211 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java @@ -19,25 +19,19 @@ package org.apache.asterix.runtime.aggregates.serializable.std; import org.apache.asterix.om.functions.BuiltinFunctions; -import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.functions.IFunctionDescriptorFactory; import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator; import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; - public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { - @Override - public IFunctionDescriptor createFunctionDescriptor() { - return new SerializableLocalSumAggregateDescriptor(); - } - }; + public static final IFunctionDescriptorFactory FACTORY = SerializableLocalSumAggregateDescriptor::new; @Override public FunctionIdentifier getIdentifier() { @@ -53,7 +47,7 @@ public class SerializableLocalSumAggregateDescriptor extends AbstractSerializabl @Override public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws HyracksDataException { - return new SerializableSumAggregateFunction(args, true, ctx, sourceLoc); + return new SerializableLocalSumAggregateFunction(args, ctx, sourceLoc); } }; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateFunction.java new file mode 100644 index 0000000..8a9e0f9 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateFunction.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.serializable.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.EnumDeserializer; +import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.DataOutput; +import java.io.IOException; + +public class SerializableLocalSumAggregateFunction extends AbstractSerializableSumAggregateFunction { + + public SerializableLocalSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException { + super.step(tuple, state, start, len); + } + + // Finish calculation + @Override + public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException { + super.finishPartial(state, start, len, out); + } + + // Is skip + @Override + protected boolean skipStep(byte[] state, int start) { + ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]); + return aggType == ATypeTag.NULL; + } + + // Handle NULL step + @Override + protected void processNull(byte[] state, int start) { + state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG; + } + + // Handle SYSTEM_NULL step + @Override + protected void processSystemNull() throws HyracksDataException { + throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, + ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + } + + // Handle NULL finish + @Override + protected void finishNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + } + + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java index d51a6fc..0dc5e17 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java @@ -19,7 +19,6 @@ package org.apache.asterix.runtime.aggregates.serializable.std; import org.apache.asterix.om.functions.BuiltinFunctions; -import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.functions.IFunctionDescriptorFactory; import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; @@ -32,12 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; - public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { - @Override - public IFunctionDescriptor createFunctionDescriptor() { - return new SerializableSqlSumAggregateDescriptor(); - } - }; + public static final IFunctionDescriptorFactory FACTORY = SerializableSqlSumAggregateDescriptor::new; @Override public FunctionIdentifier getIdentifier() { @@ -53,7 +47,7 @@ public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableA @Override public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws HyracksDataException { - return new SerializableSqlSumAggregateFunction(args, false, ctx, sourceLoc); + return new SerializableSqlSumAggregateFunction(args, ctx, sourceLoc); } }; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java index 38033f0..3062a37 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java @@ -18,54 +18,62 @@ */ package org.apache.asterix.runtime.aggregates.serializable.std; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; -import org.apache.asterix.om.base.ANull; -import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.DataOutput; +import java.io.IOException; public class SerializableSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction { - private final boolean isLocalAgg; - public SerializableSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, boolean isLocalAgg, - IHyracksTaskContext context, SourceLocation sourceLoc) throws HyracksDataException { + public SerializableSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { super(args, context, sourceLoc); - this.isLocalAgg = isLocalAgg; } + // Called for each incoming tuple + @Override + public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException { + super.step(tuple, state, start, len); + } + + // Finish calculation + @Override + public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException { + super.finish(state, start, len, out); + } + + // Is skip + @Override + protected boolean skipStep(byte[] state, int start) { + return false; + } + + // Handle NULL step @Override protected void processNull(byte[] state, int start) { + // Do nothing } + // Handle SYSTEM_NULL step @Override - protected void processSystemNull() throws HyracksDataException { - // For global aggregates simply ignore system null here, - // but if all input value are system null, then we should return - // null in finish(). - if (isLocalAgg) { - throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SQL_SUM, - ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); - } + protected void processSystemNull() { + // Do nothing } - @SuppressWarnings("unchecked") + // Handle NULL finish @Override - protected void finishSystemNull(DataOutput out) throws IOException { - // Empty stream. For local agg return system null. For global agg return null. - if (isLocalAgg) { - out.writeByte(ATypeTag.SYSTEM_NULL.serialize()); - } else { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL); - serde.serialize(ANull.NULL, out); - } + protected void finishNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); } + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java index 43eea5b..7619ca8 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java @@ -19,7 +19,6 @@ package org.apache.asterix.runtime.aggregates.serializable.std; import org.apache.asterix.om.functions.BuiltinFunctions; -import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.functions.IFunctionDescriptorFactory; import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; @@ -32,12 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class SerializableSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; - public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { - @Override - public IFunctionDescriptor createFunctionDescriptor() { - return new SerializableSumAggregateDescriptor(); - } - }; + public static final IFunctionDescriptorFactory FACTORY = SerializableSumAggregateDescriptor::new; @Override public FunctionIdentifier getIdentifier() { @@ -53,7 +47,7 @@ public class SerializableSumAggregateDescriptor extends AbstractSerializableAggr @Override public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws HyracksDataException { - return new SerializableSumAggregateFunction(args, false, ctx, sourceLoc); + return new SerializableSumAggregateFunction(args, ctx, sourceLoc); } }; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java index 278914f..0be49e7 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java @@ -18,62 +18,64 @@ */ package org.apache.asterix.runtime.aggregates.serializable.std; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; -import org.apache.asterix.om.base.ANull; -import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.EnumDeserializer; -import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.DataOutput; +import java.io.IOException; public class SerializableSumAggregateFunction extends AbstractSerializableSumAggregateFunction { - private final boolean isLocalAgg; - public SerializableSumAggregateFunction(IScalarEvaluatorFactory[] args, boolean isLocalAgg, - IHyracksTaskContext context, SourceLocation sourceLoc) throws HyracksDataException { + public SerializableSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { super(args, context, sourceLoc); - this.isLocalAgg = isLocalAgg; } + // Called for each incoming tuple @Override - protected void processNull(byte[] state, int start) { - state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG; + public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException { + super.step(tuple, state, start, len); + } + + // Finish calculation + @Override + public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException { + super.finish(state, start, len, out); } + // Is skip @Override protected boolean skipStep(byte[] state, int start) { ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]); return aggType == ATypeTag.NULL; } + // Handle NULL step @Override - protected void processSystemNull() throws HyracksDataException { - // For global aggregates simply ignore system null here, - // but if all input value are system null, then we should return - // null in finish(). - if (isLocalAgg) { - throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, - ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); - } + protected void processNull(byte[] state, int start) { + state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG; } - @SuppressWarnings("unchecked") + // Handle SYSTEM_NULL step @Override - protected void finishSystemNull(DataOutput out) throws IOException { - // Empty stream. For local agg return system null. For global agg return null. - if (isLocalAgg) { - out.writeByte(ATypeTag.SYSTEM_NULL.serialize()); - } else { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL); - serde.serialize(ANull.NULL, out); - } + protected void processSystemNull() { + // Do nothing + } + + // Handle NULL finish + @Override + protected void finishNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); } + // Handle SYSTEM_NULL finish + @Override + protected void finishSystemNull(DataOutput out) throws IOException { + out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java index 037e307..836c24e 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java @@ -18,29 +18,18 @@ */ package org.apache.asterix.runtime.aggregates.std; -import java.io.IOException; - -import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer; -import org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer; -import org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer; -import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer; -import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer; -import org.apache.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer; +import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.dataflow.data.nontagged.serde.*; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.om.base.AMutableDouble; -import org.apache.asterix.om.base.AMutableFloat; -import org.apache.asterix.om.base.AMutableInt16; -import org.apache.asterix.om.base.AMutableInt32; import org.apache.asterix.om.base.AMutableInt64; -import org.apache.asterix.om.base.AMutableInt8; -import org.apache.asterix.om.base.ANull; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.EnumDeserializer; -import org.apache.asterix.om.types.hierachy.ATypeHierarchy; -import org.apache.asterix.runtime.exceptions.IncompatibleTypeException; +import org.apache.asterix.runtime.exceptions.OverflowException; import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -52,91 +41,119 @@ import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +import java.io.IOException; + public abstract class AbstractSumAggregateFunction extends AbstractAggregateFunction { + + // Handles evaluating and storing/passing serialized data protected ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage(); private IPointable inputVal = new VoidPointable(); private IScalarEvaluator eval; - private double sum; + + // Aggregate type protected ATypeTag aggType; - private AMutableDouble aDouble = new AMutableDouble(0); - private AMutableFloat aFloat = new AMutableFloat(0); + + // Result holders + private long sumInt64; + private double sumDouble; private AMutableInt64 aInt64 = new AMutableInt64(0); - private AMutableInt32 aInt32 = new AMutableInt32(0); - private AMutableInt16 aInt16 = new AMutableInt16((short) 0); - private AMutableInt8 aInt8 = new AMutableInt8((byte) 0); + private AMutableDouble aDouble = new AMutableDouble(0); + + // Flags for output type (If all output flags are false, double output is used) + private boolean isUseInt64ForResult = true; + + // Serializer/Deserializer @SuppressWarnings("rawtypes") - protected ISerializerDeserializer serde; + private ISerializerDeserializer aInt64Serde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); + @SuppressWarnings("rawtypes") + private ISerializerDeserializer aDoubleSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE); + // Constructor public AbstractSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, SourceLocation sourceLoc) throws HyracksDataException { super(sourceLoc); eval = args[0].createScalarEvaluator(context); } + // Abstract methods + protected abstract boolean skipStep(); // Skip step + + protected abstract void processNull(); // Handle NULL step + + protected abstract void processSystemNull() throws HyracksDataException; // Handle SYSTEM_NULL step + + protected abstract void finishNull(IPointable result) throws IOException; // Handle NULL finish + + protected abstract void finishSystemNull(IPointable result) throws IOException; // Handle SYSTEM_NULL finish + + // Init the values @Override public void init() throws HyracksDataException { aggType = ATypeTag.SYSTEM_NULL; - sum = 0.0; + sumInt64 = 0; + sumDouble = 0.0; } + // Called for each incoming tuple @Override public void step(IFrameTupleReference tuple) throws HyracksDataException { + // Skip current step if (skipStep()) { return; } + + // Evaluate/Get the data from the tuple eval.evaluate(tuple, inputVal); byte[] data = inputVal.getByteArray(); int offset = inputVal.getStartOffset(); + // Get the data type tag ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[offset]); + + // Handle MISSING and NULL values if (typeTag == ATypeTag.MISSING || typeTag == ATypeTag.NULL) { processNull(); return; - } else if (aggType == ATypeTag.SYSTEM_NULL) { - aggType = typeTag; - } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) { - if (typeTag.ordinal() > aggType.ordinal()) { - throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, typeTag.serialize(), - aggType.serialize()); - } else { - throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize(), - typeTag.serialize()); - } } - - if (ATypeHierarchy.canPromote(aggType, typeTag)) { + // Non-missing and Non-null + else if (aggType == ATypeTag.SYSTEM_NULL) { aggType = typeTag; } + // Calculate based on the incoming data type + handles invalid data type switch (typeTag) { case TINYINT: { byte val = AInt8SerializerDeserializer.getByte(data, offset + 1); - sum += val; + processInt64Value(val); break; } case SMALLINT: { short val = AInt16SerializerDeserializer.getShort(data, offset + 1); - sum += val; + processInt64Value(val); break; } case INTEGER: { int val = AInt32SerializerDeserializer.getInt(data, offset + 1); - sum += val; + processInt64Value(val); break; } case BIGINT: { long val = AInt64SerializerDeserializer.getLong(data, offset + 1); - sum += val; + processInt64Value(val); break; } case FLOAT: { + upgradeOutputType(); float val = AFloatSerializerDeserializer.getFloat(data, offset + 1); - sum += val; + processFloatValue(val); break; } case DOUBLE: { + upgradeOutputType(); double val = ADoubleSerializerDeserializer.getDouble(data, offset + 1); - sum += val; + processFloatValue(val); break; } case SYSTEM_NULL: { @@ -144,83 +161,95 @@ public abstract class AbstractSumAggregateFunction extends AbstractAggregateFunc break; } default: { - throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize()); + throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, typeTag.serialize()); } } } - @SuppressWarnings("unchecked") - @Override - public void finish(IPointable result) throws HyracksDataException { - resultStorage.reset(); - try { - switch (aggType) { - case TINYINT: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8); - aInt8.setValue((byte) sum); - serde.serialize(aInt8, resultStorage.getDataOutput()); - break; - } - case SMALLINT: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16); - aInt16.setValue((short) sum); - serde.serialize(aInt16, resultStorage.getDataOutput()); - break; - } - case INTEGER: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); - aInt32.setValue((int) sum); - serde.serialize(aInt32, resultStorage.getDataOutput()); - break; - } - case BIGINT: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); - aInt64.setValue((long) sum); - serde.serialize(aInt64, resultStorage.getDataOutput()); - break; - } - case FLOAT: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT); - aFloat.setValue((float) sum); - serde.serialize(aFloat, resultStorage.getDataOutput()); - break; - } - case DOUBLE: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE); - aDouble.setValue(sum); - serde.serialize(aDouble, resultStorage.getDataOutput()); - break; - } - case NULL: { - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL); - serde.serialize(ANull.NULL, resultStorage.getDataOutput()); - break; - } - case SYSTEM_NULL: { - finishSystemNull(); - break; - } - default: - throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize()); + // Upgrade the output type + private void upgradeOutputType() { + isUseInt64ForResult = false; + } + + // Process int64 value + private void processInt64Value(long value) throws HyracksDataException { + // Check the output flag first + if (!isUseInt64ForResult) { + processFloatValue(value); + } + // Int64 output, watch out for overflow exception + else { + try { + sumInt64 = Math.addExact(sumInt64, value); + sumDouble = sumInt64; // Keep the sumDouble variable up-to-date as well + } catch (ArithmeticException ignored) { + throw new OverflowException(sourceLoc, getIdentifier()); } - } catch (IOException e) { - throw HyracksDataException.create(e); } - result.set(resultStorage); } + // Process float value + private void processFloatValue(double value) { + // If this method is called, it means the output is going to be a double, no need to check the output type flag + // and the sumInt64 can be ignored + sumDouble += value; + } + + // Called for partial calculations + @SuppressWarnings("unchecked") @Override public void finishPartial(IPointable result) throws HyracksDataException { - finish(result); + // finishPartial() has identical behavior to finish() + finishFinal(result); } - protected boolean skipStep() { - return false; + // Called for final calculations + @SuppressWarnings("unchecked") + @Override + public void finish(IPointable result) throws HyracksDataException { + // Finish + finishFinal(result); } - protected abstract void processNull(); + // Called for final calculations + @SuppressWarnings("unchecked") + private void finishFinal(IPointable result) throws HyracksDataException { + // Reset the result storage + resultStorage.reset(); + + try { + // aggType is SYSTEM_NULL + if (aggType == ATypeTag.SYSTEM_NULL) { + if (GlobalConfig.DEBUG) { + GlobalConfig.ASTERIX_LOGGER.trace("SUM aggregate ran over zero values."); + } - protected abstract void processSystemNull() throws HyracksDataException; + finishSystemNull(result); + } + // aggType is NULL + else if (aggType == ATypeTag.NULL) { + finishNull(result); + } + // Pass the result + else { + // Output type based on the flag + if (isUseInt64ForResult) { + aInt64.setValue(sumInt64); + aInt64Serde.serialize(aInt64, resultStorage.getDataOutput()); + result.set(resultStorage); + } else { + aDouble.setValue(sumDouble); + aDoubleSerde.serialize(aDouble, resultStorage.getDataOutput()); + result.set(resultStorage); + } + } + } catch (IOException ex) { + throw HyracksDataException.create(ex); + } + } - protected abstract void finishSystemNull() throws IOException; + // Function identifier + private FunctionIdentifier getIdentifier() { + return BuiltinFunctions.SUM; + } }
