http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java new file mode 100644 index 0000000..25f9db2 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; +import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; +import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; + +/** + * This {@link IncrementalAggregator} takes the max of the fields provided in the {@link InputEvent}. + * + * @since 3.1.0 + */ +@Name("MAX") +public class AggregatorMax extends AbstractIncrementalAggregator +{ + private static final long serialVersionUID = 201503120332L; + + public AggregatorMax() + { + //Do nothing + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + Aggregate aggregate = super.getGroup(src, aggregatorIndex); + + GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); + + return aggregate; + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + GPOMutable destAggs = dest.getAggregates(); + GPOMutable srcAggs = src.getAggregates(); + + { + byte[] destByte = destAggs.getFieldsByte(); + if (destByte != null) { + byte[] srcByte = srcAggs.getFieldsByte(); + int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset; + for (int index = 0; + index < destByte.length; + index++) { + byte tempVal = srcByte[srcIndices[index]]; + if (destByte[index] < tempVal) { + destByte[index] = tempVal; + } + } + } + } + + { + short[] destShort = destAggs.getFieldsShort(); + if (destShort != null) { + short[] srcShort = srcAggs.getFieldsShort(); + int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset; + for (int index = 0; + index < destShort.length; + index++) { + short tempVal = srcShort[srcIndices[index]]; + if (destShort[index] < tempVal) { + destShort[index] = tempVal; + } + } + } + } + + { + int[] destInteger = destAggs.getFieldsInteger(); + if (destInteger != null) { + int[] srcInteger = srcAggs.getFieldsInteger(); + int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset; + for (int index = 0; + index < destInteger.length; + index++) { + int tempVal = srcInteger[srcIndices[index]]; + if (destInteger[index] < tempVal) { + destInteger[index] = tempVal; + } + } + } + } + + { + long[] destLong = destAggs.getFieldsLong(); + if (destLong != null) { + long[] srcLong = srcAggs.getFieldsLong(); + int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset; + for (int index = 0; + index < destLong.length; + index++) { + long tempVal = srcLong[srcIndices[index]]; + if (destLong[index] < tempVal) { + destLong[index] = tempVal; + } + } + } + } + + { + float[] destFloat = destAggs.getFieldsFloat(); + if (destFloat != null) { + float[] srcFloat = srcAggs.getFieldsFloat(); + int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset; + for (int index = 0; + index < destFloat.length; + index++) { + float tempVal = srcFloat[srcIndices[index]]; + if (destFloat[index] < tempVal) { + destFloat[index] = tempVal; + } + } + } + } + + { + double[] destDouble = destAggs.getFieldsDouble(); + if (destDouble != null) { + double[] srcDouble = srcAggs.getFieldsDouble(); + int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset; + for (int index = 0; + index < destDouble.length; + index++) { + double tempVal = srcDouble[srcIndices[index]]; + if (destDouble[index] < tempVal) { + destDouble[index] = tempVal; + } + } + } + } + } + + @Override + public void aggregate(Aggregate dest, Aggregate src) + { + GPOMutable destAggs = dest.getAggregates(); + GPOMutable srcAggs = src.getAggregates(); + + { + byte[] destByte = destAggs.getFieldsByte(); + if (destByte != null) { + byte[] srcByte = srcAggs.getFieldsByte(); + + for (int index = 0; + index < destByte.length; + index++) { + if (destByte[index] < srcByte[index]) { + destByte[index] = srcByte[index]; + } + } + } + } + + { + short[] destShort = destAggs.getFieldsShort(); + if (destShort != null) { + short[] srcShort = srcAggs.getFieldsShort(); + + for (int index = 0; + index < destShort.length; + index++) { + if (destShort[index] < srcShort[index]) { + destShort[index] = srcShort[index]; + } + } + } + } + + { + int[] destInteger = destAggs.getFieldsInteger(); + if (destInteger != null) { + int[] srcInteger = srcAggs.getFieldsInteger(); + + for (int index = 0; + index < destInteger.length; + index++) { + if (destInteger[index] < srcInteger[index]) { + destInteger[index] = srcInteger[index]; + } + } + } + } + + { + long[] destLong = destAggs.getFieldsLong(); + if (destLong != null) { + long[] srcLong = srcAggs.getFieldsLong(); + + for (int index = 0; + index < destLong.length; + index++) { + if (destLong[index] < srcLong[index]) { + destLong[index] = srcLong[index]; + } + } + } + } + + { + float[] destFloat = destAggs.getFieldsFloat(); + if (destFloat != null) { + float[] srcFloat = srcAggs.getFieldsFloat(); + + for (int index = 0; + index < destFloat.length; + index++) { + if (destFloat[index] < srcFloat[index]) { + destFloat[index] = srcFloat[index]; + } + } + } + } + + { + double[] destDouble = destAggs.getFieldsDouble(); + if (destDouble != null) { + double[] srcDouble = srcAggs.getFieldsDouble(); + + for (int index = 0; + index < destDouble.length; + index++) { + if (destDouble[index] < srcDouble[index]) { + destDouble[index] = srcDouble[index]; + } + } + } + } + } + + @Override + public Type getOutputType(Type inputType) + { + return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType); + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return null; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java new file mode 100644 index 0000000..b377e9b --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; +import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; +import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; + +/** + * This {@link IncrementalAggregator} takes the min of the fields provided in the {@link InputEvent}. + * + * @since 3.1.0 + */ +@Name("MIN") +public class AggregatorMin extends AbstractIncrementalAggregator +{ + private static final long serialVersionUID = 20154301648L; + + public AggregatorMin() + { + //Do nothing + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + Aggregate aggregate = super.getGroup(src, aggregatorIndex); + + GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); + + return aggregate; + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + GPOMutable destAggs = dest.getAggregates(); + GPOMutable srcAggs = src.getAggregates(); + + { + byte[] destByte = destAggs.getFieldsByte(); + if (destByte != null) { + byte[] srcByte = srcAggs.getFieldsByte(); + int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset; + for (int index = 0; + index < destByte.length; + index++) { + byte tempVal = srcByte[srcIndices[index]]; + if (destByte[index] > tempVal) { + destByte[index] = tempVal; + } + } + } + } + + { + short[] destShort = destAggs.getFieldsShort(); + if (destShort != null) { + short[] srcShort = srcAggs.getFieldsShort(); + int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset; + for (int index = 0; + index < destShort.length; + index++) { + short tempVal = srcShort[srcIndices[index]]; + if (destShort[index] > tempVal) { + destShort[index] = tempVal; + } + } + } + } + + { + int[] destInteger = destAggs.getFieldsInteger(); + if (destInteger != null) { + int[] srcInteger = srcAggs.getFieldsInteger(); + int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset; + for (int index = 0; + index < destInteger.length; + index++) { + int tempVal = srcInteger[srcIndices[index]]; + if (destInteger[index] > tempVal) { + destInteger[index] = tempVal; + } + } + } + } + + { + long[] destLong = destAggs.getFieldsLong(); + if (destLong != null) { + long[] srcLong = srcAggs.getFieldsLong(); + int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset; + for (int index = 0; + index < destLong.length; + index++) { + long tempVal = srcLong[srcIndices[index]]; + if (destLong[index] > tempVal) { + destLong[index] = tempVal; + } + } + } + } + + { + float[] destFloat = destAggs.getFieldsFloat(); + if (destFloat != null) { + float[] srcFloat = srcAggs.getFieldsFloat(); + int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset; + for (int index = 0; + index < destFloat.length; + index++) { + float tempVal = srcFloat[srcIndices[index]]; + if (destFloat[index] > tempVal) { + destFloat[index] = tempVal; + } + } + } + } + + { + double[] destDouble = destAggs.getFieldsDouble(); + if (destDouble != null) { + double[] srcDouble = srcAggs.getFieldsDouble(); + int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset; + for (int index = 0; + index < destDouble.length; + index++) { + double tempVal = srcDouble[srcIndices[index]]; + if (destDouble[index] > tempVal) { + destDouble[index] = tempVal; + } + } + } + } + } + + @Override + public void aggregate(Aggregate dest, Aggregate src) + { + GPOMutable destAggs = dest.getAggregates(); + GPOMutable srcAggs = src.getAggregates(); + + { + byte[] destByte = destAggs.getFieldsByte(); + if (destByte != null) { + byte[] srcByte = srcAggs.getFieldsByte(); + + for (int index = 0; + index < destByte.length; + index++) { + if (destByte[index] > srcByte[index]) { + destByte[index] = srcByte[index]; + } + } + } + } + + { + short[] destShort = destAggs.getFieldsShort(); + if (destShort != null) { + short[] srcShort = srcAggs.getFieldsShort(); + + for (int index = 0; + index < destShort.length; + index++) { + if (destShort[index] > srcShort[index]) { + destShort[index] = srcShort[index]; + } + } + } + } + + { + int[] destInteger = destAggs.getFieldsInteger(); + if (destInteger != null) { + int[] srcInteger = srcAggs.getFieldsInteger(); + + for (int index = 0; + index < destInteger.length; + index++) { + if (destInteger[index] > srcInteger[index]) { + destInteger[index] = srcInteger[index]; + } + } + } + } + + { + long[] destLong = destAggs.getFieldsLong(); + if (destLong != null) { + long[] srcLong = srcAggs.getFieldsLong(); + + for (int index = 0; + index < destLong.length; + index++) { + if (destLong[index] > srcLong[index]) { + destLong[index] = srcLong[index]; + } + } + } + } + + { + float[] destFloat = destAggs.getFieldsFloat(); + if (destFloat != null) { + float[] srcFloat = srcAggs.getFieldsFloat(); + + for (int index = 0; + index < destFloat.length; + index++) { + if (destFloat[index] > srcFloat[index]) { + destFloat[index] = srcFloat[index]; + } + } + } + } + + { + double[] destDouble = destAggs.getFieldsDouble(); + if (destDouble != null) { + double[] srcDouble = srcAggs.getFieldsDouble(); + + for (int index = 0; + index < destDouble.length; + index++) { + if (destDouble[index] > srcDouble[index]) { + destDouble[index] = srcDouble[index]; + } + } + } + } + } + + @Override + public Type getOutputType(Type inputType) + { + return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType); + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java new file mode 100644 index 0000000..fd711cb --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import java.util.Collections; +import java.util.Map; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +/** + * This is a convenience enum to store all the information about default {@link OTFAggregator}s + * in one place. + * + * @since 3.1.0 + */ +public enum AggregatorOTFType +{ + /** + * The average {@link OTFAggregator}. + */ + AVG(AggregatorAverage.INSTANCE); + + /** + * A map from {@link OTFAggregator} names to {@link OTFAggregator}s. + */ + public static final Map<String, OTFAggregator> NAME_TO_AGGREGATOR; + + static { + Map<String, OTFAggregator> nameToAggregator = Maps.newHashMap(); + + for (AggregatorOTFType aggType : AggregatorOTFType.values()) { + nameToAggregator.put(aggType.name(), aggType.getAggregator()); + } + + NAME_TO_AGGREGATOR = Collections.unmodifiableMap(nameToAggregator); + } + + /** + * The {@link OTFAggregator} assigned to this enum. + */ + private OTFAggregator aggregator; + + /** + * Creates an {@link OTFAggregator} enum with the given aggregator. + * + * @param aggregator The {@link OTFAggregator} assigned to this enum. + */ + AggregatorOTFType(OTFAggregator aggregator) + { + setAggregator(aggregator); + } + + /** + * Sets the {@link OTFAggregator} assigned to this enum. + * + * @param aggregator The {@link OTFAggregator} assigned to this enum. + */ + private void setAggregator(OTFAggregator aggregator) + { + this.aggregator = Preconditions.checkNotNull(aggregator); + } + + /** + * Gets the {@link OTFAggregator} assigned to this enum. + * + * @return The {@link OTFAggregator} assigned to this enum. + */ + public OTFAggregator getAggregator() + { + return aggregator; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java new file mode 100644 index 0000000..ff5a75d --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java @@ -0,0 +1,424 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * <p> + * This registry is used by generic dimensions computation operators and dimension stores in order to support + * plugging different + * aggregators into the operator. Subclasses of + * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema} use this registry + * to support pluggable aggregators when doing dimensions computation, and Subclasses of + * AppDataSingleSchemaDimensionStoreHDHT use this class as well. + * </p> + * <p> + * The primary purpose of an {@link AggregatorRegistry} is to provide a mapping from aggregator names to aggregators, + * and to provide mappings from aggregator IDs to aggregators. These mappings are necessary in order to correctly + * process schemas, App Data queries, and store aggregated data. + * </p> + * + * @since 3.1.0 + */ +public class AggregatorRegistry implements Serializable +{ + private static final long serialVersionUID = 20154301642L; + + /** + * This is a map from {@link IncrementalAggregator} names to {@link IncrementalAggregator}s used by the + * default {@link AggregatorRegistry}. + */ + private static final transient Map<String, IncrementalAggregator> DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR; + /** + * This is a map from {@link OTFAggregator} names to {@link OTFAggregator}s used by the default + * {@link AggregatorRegistry}. + */ + private static final transient Map<String, OTFAggregator> DEFAULT_NAME_TO_OTF_AGGREGATOR; + + //Build the default maps + static { + DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR = Maps.newHashMap(AggregatorIncrementalType.NAME_TO_AGGREGATOR); + DEFAULT_NAME_TO_OTF_AGGREGATOR = Maps.newHashMap(AggregatorOTFType.NAME_TO_AGGREGATOR); + } + + /** + * This is a default aggregator registry that can be used in operators. + */ + public static final AggregatorRegistry DEFAULT_AGGREGATOR_REGISTRY = new AggregatorRegistry( + DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR, DEFAULT_NAME_TO_OTF_AGGREGATOR, + AggregatorIncrementalType.NAME_TO_ORDINAL); + + /** + * This is a flag indicating whether or not this {@link AggregatorRegistry} has been setup before or not. + */ + private transient boolean setup = false; + /** + * This is a map from the class of an {@link IncrementalAggregator} to the name of that + * {@link IncrementalAggregator}. + */ + private transient Map<Class<? extends IncrementalAggregator>, String> classToIncrementalAggregatorName; + /** + * This is a map from the name of an {@link OTFAggregator} to the list of the names of all + * {@link IncrementalAggregator} that are child aggregators of that {@link OTFAggregator}. + */ + private transient Map<String, List<String>> otfAggregatorToIncrementalAggregators; + /** + * This is a map from the aggregator ID of an + * {@link IncrementalAggregator} to the corresponding {@link IncrementalAggregator}. + */ + private transient Map<Integer, IncrementalAggregator> incrementalAggregatorIDToAggregator; + /** + * This is a map from the name assigned to an {@link IncrementalAggregator} to the {@link IncrementalAggregator}. + */ + private Map<String, IncrementalAggregator> nameToIncrementalAggregator; + /** + * This is a map from the name assigned to an {@link OTFAggregator} to the {@link OTFAggregator}. + */ + private Map<String, OTFAggregator> nameToOTFAggregator; + /** + * This is a map from the name of an {@link IncrementalAggregator} to the ID of that {@link IncrementalAggregator}. + */ + private Map<String, Integer> incrementalAggregatorNameToID; + + /** + * This is a helper method used to autogenerate the IDs for each {@link IncrementalAggregator} + * + * @param nameToAggregator A mapping from the name of an {@link IncrementalAggregator} to the + * {@link IncrementalAggregator}. + * @return A mapping from the name of an {@link IncrementalAggregator} to the ID assigned to that + * {@link IncrementalAggregator}. + */ + private static Map<String, Integer> autoGenIds(Map<String, IncrementalAggregator> nameToAggregator) + { + Map<String, Integer> staticAggregatorNameToID = Maps.newHashMap(); + + for (Map.Entry<String, IncrementalAggregator> entry : nameToAggregator.entrySet()) { + staticAggregatorNameToID.put(entry.getKey(), stringHash(entry.getValue().getClass().getName())); + } + + return staticAggregatorNameToID; + } + + /** + * This is a helper method for computing the hash of the string. This is intended to be a static unchanging + * method since the computed hash is used for aggregator IDs which are used for persistence. + * <p> + * <b>Note:</b> Do not change this function it will cause corruption for users updating existing data stores. + * </p> + * + * @return The hash of the given string. + */ + private static int stringHash(String string) + { + int hash = 5381; + + for (int index = 0; + index < string.length(); + index++) { + int character = (int)string.charAt(index); + hash = hash * 33 + character; + } + + return hash; + } + + /** + * This constructor is present for Kryo serialization + */ + private AggregatorRegistry() + { + //for kryo + } + + /** + * <p> + * This creates an {@link AggregatorRegistry} which assigns the given names to the given + * {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor also auto-generates + * the IDs associated with each {@link IncrementalAggregator} by computing the hashcode of the + * fully qualified class name of each {@link IncrementalAggregator}. + * </p> + * <p> + * <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the + * only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored. + * </p> + * + * @param nameToIncrementalAggregator This is a map from {@link String} to {@link IncrementalAggregator}, + * where the string is the name of an + * {@link IncrementalAggregator} and the value is the {@link IncrementalAggregator} + * with that name. + * @param nameToOTFAggregator This is a map from {@link String} to {@link OTFAggregator}, where the string + * is the name of + * an {@link OTFAggregator} and the value is the {@link OTFAggregator} with that + * name. + */ + public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator, + Map<String, OTFAggregator> nameToOTFAggregator) + { + this(nameToIncrementalAggregator, + nameToOTFAggregator, + autoGenIds(nameToIncrementalAggregator)); + } + + /** + * <p> + * This creates an {@link AggregatorRegistry} which assigns the given names to the given + * {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor assigns IDs to each + * {@link IncrementalAggregator} by using the provided map from incremental aggregator names to IDs. + * </p> + * <p> + * <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the + * only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored. + * </p> + * + * @param nameToIncrementalAggregator This is a map from {@link String} to {@link IncrementalAggregator}, + * where the string is the name of an + * {@link IncrementalAggregator} and the value is the + * {@link IncrementalAggregator} + * with that name. + * @param nameToOTFAggregator This is a map from {@link String} to {@link OTFAggregator}, where the + * string is the name of + * an {@link OTFAggregator} and the value is the {@link OTFAggregator} with + * that name. + * @param incrementalAggregatorNameToID This is a map from the name of an {@link IncrementalAggregator} to the ID + * for that + * {@link IncrementalAggregator}. + */ + public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator, + Map<String, OTFAggregator> nameToOTFAggregator, + Map<String, Integer> incrementalAggregatorNameToID) + { + setNameToIncrementalAggregator(nameToIncrementalAggregator); + setNameToOTFAggregator(nameToOTFAggregator); + + setIncrementalAggregatorNameToID(incrementalAggregatorNameToID); + + validate(); + } + + /** + * This is a helper method which is used to do validation on the maps provided to the constructor of this class. + */ + private void validate() + { + for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) { + Preconditions.checkNotNull(entry.getKey()); + Preconditions.checkNotNull(entry.getValue()); + } + + for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) { + Preconditions.checkNotNull(entry.getKey()); + Preconditions.checkNotNull(entry.getValue()); + } + + for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) { + Preconditions.checkNotNull(entry.getKey()); + Preconditions.checkNotNull(entry.getValue()); + } + } + + /** + * This method is called to initialize various internal datastructures of the {@link AggregatorRegistry}. + * This method should be called before the {@link AggregatorRegistry} is used. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public void setup() + { + if (setup) { + //If the AggregatorRegistry was already setup. Don't set it up again. + return; + } + + setup = true; + + classToIncrementalAggregatorName = Maps.newHashMap(); + + for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) { + classToIncrementalAggregatorName.put((Class)entry.getValue().getClass(), entry.getKey()); + } + + incrementalAggregatorIDToAggregator = Maps.newHashMap(); + + for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) { + String aggregatorName = entry.getKey(); + int aggregatorID = entry.getValue(); + incrementalAggregatorIDToAggregator.put(aggregatorID, + nameToIncrementalAggregator.get(aggregatorName)); + } + + otfAggregatorToIncrementalAggregators = Maps.newHashMap(); + + for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) { + String name = entry.getKey(); + List<String> staticAggregators = Lists.newArrayList(); + + OTFAggregator dotfAggregator = nameToOTFAggregator.get(name); + + for (Class clazz : dotfAggregator.getChildAggregators()) { + staticAggregators.add(classToIncrementalAggregatorName.get(clazz)); + } + + otfAggregatorToIncrementalAggregators.put(name, staticAggregators); + } + } + + /** + * This is a helper method which sets and validated the given mapping from an {@link IncrementalAggregator}'s name + * to an {@link IncrementalAggregator}. + * + * @param nameToIncrementalAggregator The mapping from an {@link IncrementalAggregator}'s name to an + * {@link IncrementalAggregator}. + */ + private void setNameToIncrementalAggregator(Map<String, IncrementalAggregator> nameToIncrementalAggregator) + { + this.nameToIncrementalAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToIncrementalAggregator)); + } + + /** + * This is a helper method which sets and validates the given mapping from an {@link OTFAggregator}'s name to + * an {@link OTFAggregator}. + * + * @param nameToOTFAggregator The mapping from an {@link OTFAggregator}'s name to an {@link OTFAggregator}. + */ + private void setNameToOTFAggregator(Map<String, OTFAggregator> nameToOTFAggregator) + { + this.nameToOTFAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToOTFAggregator)); + } + + /** + * Checks if the given aggregatorName is the name of an {@link IncrementalAggregator} or {@link OTFAggregator} + * registered to this registry. + * + * @param aggregatorName The aggregator name to check. + * @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered to + * this registry. False otherwise. + */ + public boolean isAggregator(String aggregatorName) + { + return classToIncrementalAggregatorName.values().contains(aggregatorName) || + nameToOTFAggregator.containsKey(aggregatorName); + } + + /** + * Checks if the given aggregator name is the name of an {@link IncrementalAggregator} registered + * to this registry. + * + * @param aggregatorName The aggregator name to check. + * @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered + * to this registry. False otherwise. + */ + public boolean isIncrementalAggregator(String aggregatorName) + { + return classToIncrementalAggregatorName.values().contains(aggregatorName); + } + + /** + * Gets the mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}. + * + * @return The mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}. + */ + public Map<Class<? extends IncrementalAggregator>, String> getClassToIncrementalAggregatorName() + { + return classToIncrementalAggregatorName; + } + + /** + * Gets the mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}. + * + * @return The mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}. + */ + public Map<Integer, IncrementalAggregator> getIncrementalAggregatorIDToAggregator() + { + return incrementalAggregatorIDToAggregator; + } + + /** + * This a helper method which sets and validates the mapping from {@link IncrementalAggregator} name to + * {@link IncrementalAggregator} ID. + * + * @param incrementalAggregatorNameToID The mapping from {@link IncrementalAggregator} name to + * {@link IncrementalAggregator} ID. + */ + private void setIncrementalAggregatorNameToID(Map<String, Integer> incrementalAggregatorNameToID) + { + Preconditions.checkNotNull(incrementalAggregatorNameToID); + + for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) { + Preconditions.checkNotNull(entry.getKey()); + Preconditions.checkNotNull(entry.getValue()); + } + + this.incrementalAggregatorNameToID = Maps.newHashMap(incrementalAggregatorNameToID); + } + + /** + * This returns a map from the names of an {@link IncrementalAggregator}s to the corresponding ID of the + * {@link IncrementalAggregator}. + * + * @return Returns a map from the names of an {@link IncrementalAggregator} to the corresponding ID of the + * {@link IncrementalAggregator}. + */ + public Map<String, Integer> getIncrementalAggregatorNameToID() + { + return incrementalAggregatorNameToID; + } + + /** + * Returns the name to {@link OTFAggregator} mapping, where the key is the name of the {@link OTFAggregator}. + * + * @return The name to {@link OTFAggregator} mapping. + */ + public Map<String, OTFAggregator> getNameToOTFAggregators() + { + return nameToOTFAggregator; + } + + /** + * Returns the mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of + * that {@link OTFAggregator}. + * + * @return The mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of + * that {@link OTFAggregator}. + */ + public Map<String, List<String>> getOTFAggregatorToIncrementalAggregators() + { + return otfAggregatorToIncrementalAggregators; + } + + /** + * Returns the name to {@link IncrementalAggregator} mapping, where the key is the name of the {@link OTFAggregator}. + * + * @return The name to {@link IncrementalAggregator} mapping. + */ + public Map<String, IncrementalAggregator> getNameToIncrementalAggregator() + { + return nameToIncrementalAggregator; + } + + private static final Logger lOG = LoggerFactory.getLogger(AggregatorRegistry.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java new file mode 100644 index 0000000..c68744b --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; +import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; +import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; + +/** + * This {@link IncrementalAggregator} performs a sum operation over the fields in the given {@link InputEvent}. + * + * @since 3.1.0 + */ +@Name("SUM") +public class AggregatorSum extends AbstractIncrementalAggregator +{ + private static final long serialVersionUID = 20154301649L; + + public AggregatorSum() + { + //Do nothing + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + src.used = true; + Aggregate aggregate = createAggregate(src, + context, + aggregatorIndex); + + GPOMutable value = aggregate.getAggregates(); + GPOUtils.zeroFillNumeric(value); + + return aggregate; + } + + @Override + public void aggregate(Aggregate dest, Aggregate src) + { + GPOMutable destAggs = dest.getAggregates(); + GPOMutable srcAggs = src.getAggregates(); + + aggregateAggs(destAggs, srcAggs); + } + + public void aggregateAggs(GPOMutable destAggs, GPOMutable srcAggs) + { + { + byte[] destByte = destAggs.getFieldsByte(); + if (destByte != null) { + byte[] srcByte = srcAggs.getFieldsByte(); + + for (int index = 0; + index < destByte.length; + index++) { + destByte[index] += srcByte[index]; + } + } + } + + { + short[] destShort = destAggs.getFieldsShort(); + if (destShort != null) { + short[] srcShort = srcAggs.getFieldsShort(); + + for (int index = 0; + index < destShort.length; + index++) { + destShort[index] += srcShort[index]; + } + } + } + + { + int[] destInteger = destAggs.getFieldsInteger(); + if (destInteger != null) { + int[] srcInteger = srcAggs.getFieldsInteger(); + + for (int index = 0; + index < destInteger.length; + index++) { + destInteger[index] += srcInteger[index]; + } + } + } + + { + long[] destLong = destAggs.getFieldsLong(); + if (destLong != null) { + long[] srcLong = srcAggs.getFieldsLong(); + + for (int index = 0; + index < destLong.length; + index++) { + destLong[index] += srcLong[index]; + } + } + } + + { + float[] destFloat = destAggs.getFieldsFloat(); + if (destFloat != null) { + float[] srcFloat = srcAggs.getFieldsFloat(); + + for (int index = 0; + index < destFloat.length; + index++) { + destFloat[index] += srcFloat[index]; + } + } + } + + { + double[] destDouble = destAggs.getFieldsDouble(); + if (destDouble != null) { + double[] srcDouble = srcAggs.getFieldsDouble(); + + for (int index = 0; + index < destDouble.length; + index++) { + destDouble[index] += srcDouble[index]; + } + } + } + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + GPOMutable destAggs = dest.getAggregates(); + GPOMutable srcAggs = src.getAggregates(); + + aggregateInput(destAggs, srcAggs); + } + + public void aggregateInput(GPOMutable destAggs, GPOMutable srcAggs) + { + { + byte[] destByte = destAggs.getFieldsByte(); + if (destByte != null) { + byte[] srcByte = srcAggs.getFieldsByte(); + int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset; + for (int index = 0; + index < destByte.length; + index++) { + destByte[index] += srcByte[srcIndices[index]]; + } + } + } + + { + short[] destShort = destAggs.getFieldsShort(); + if (destShort != null) { + short[] srcShort = srcAggs.getFieldsShort(); + int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset; + for (int index = 0; + index < destShort.length; + index++) { + destShort[index] += srcShort[srcIndices[index]]; + } + } + } + + { + int[] destInteger = destAggs.getFieldsInteger(); + if (destInteger != null) { + int[] srcInteger = srcAggs.getFieldsInteger(); + int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset; + for (int index = 0; + index < destInteger.length; + index++) { + destInteger[index] += srcInteger[srcIndices[index]]; + } + } + } + + { + long[] destLong = destAggs.getFieldsLong(); + if (destLong != null) { + long[] srcLong = srcAggs.getFieldsLong(); + int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset; + for (int index = 0; + index < destLong.length; + index++) { + destLong[index] += srcLong[srcIndices[index]]; + } + } + } + + { + float[] destFloat = destAggs.getFieldsFloat(); + if (destFloat != null) { + float[] srcFloat = srcAggs.getFieldsFloat(); + int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset; + for (int index = 0; + index < destFloat.length; + index++) { + destFloat[index] += srcFloat[srcIndices[index]]; + } + } + } + + { + double[] destDouble = destAggs.getFieldsDouble(); + if (destDouble != null) { + double[] srcDouble = srcAggs.getFieldsDouble(); + int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset; + for (int index = 0; + index < destDouble.length; + index++) { + destDouble[index] += srcDouble[srcIndices[index]]; + } + } + } + } + + @Override + public Type getOutputType(Type inputType) + { + return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType); + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return null; + } + + private static final Logger LOG = LoggerFactory.getLogger(AggregatorSum.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java new file mode 100644 index 0000000..9643310 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import java.util.Collections; +import java.util.Map; + +import com.google.common.collect.Maps; + +import com.datatorrent.lib.appdata.schemas.Fields; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * This class contains utility methods which are useful for aggregators. + * + * @since 3.1.0 + */ +public final class AggregatorUtils +{ + /** + * This is an identity type map, which maps input types to the same output types. + */ + public static final transient Map<Type, Type> IDENTITY_TYPE_MAP; + /** + * This is an identity type map, for numeric types only. This is + * helpful when creating aggregators like {@link AggregatorSum}, where the sum of ints is an + * int and the sum of floats is a float. + */ + public static final transient Map<Type, Type> IDENTITY_NUMBER_TYPE_MAP; + + static { + Map<Type, Type> identityTypeMap = Maps.newHashMap(); + + for (Type type : Type.values()) { + identityTypeMap.put(type, type); + } + + IDENTITY_TYPE_MAP = Collections.unmodifiableMap(identityTypeMap); + + Map<Type, Type> identityNumberTypeMap = Maps.newHashMap(); + + for (Type type : Type.NUMERIC_TYPES) { + identityNumberTypeMap.put(type, type); + } + + IDENTITY_NUMBER_TYPE_MAP = Collections.unmodifiableMap(identityNumberTypeMap); + } + + /** + * Don't instantiate this class. + */ + private AggregatorUtils() + { + //Don't instantiate this class. + } + + /** + * This is a helper method which takes a {@link FieldsDescriptor} object, which defines the types of the fields + * that the {@link IncrementalAggregator} receives as input. It then uses the given {@link IncrementalAggregator} + * and {@link FieldsDescriptor} object to compute the {@link FieldsDescriptor} object for the aggregation produced + * byte the given + * {@link IncrementalAggregator} when it receives an input corresponding to the given input {@link FieldsDescriptor}. + * + * @param inputFieldsDescriptor This is a {@link FieldsDescriptor} object which defines the names and types of input + * data recieved by an aggregator. + * @param incrementalAggregator This is the + * {@link IncrementalAggregator} for which an output {@link FieldsDescriptor} needs + * to be computed. + * @return The output {@link FieldsDescriptor} for this aggregator when it receives input data with the same schema as + * the specified input {@link FieldsDescriptor}. + */ + public static FieldsDescriptor getOutputFieldsDescriptor(FieldsDescriptor inputFieldsDescriptor, + IncrementalAggregator incrementalAggregator) + { + Map<String, Type> fieldToType = Maps.newHashMap(); + + for (Map.Entry<String, Type> entry : + inputFieldsDescriptor.getFieldToType().entrySet()) { + String fieldName = entry.getKey(); + Type fieldType = entry.getValue(); + Type outputType = incrementalAggregator.getOutputType(fieldType); + fieldToType.put(fieldName, outputType); + } + + return new FieldsDescriptor(fieldToType); + } + + /** + * This is a utility method which creates an output {@link FieldsDescriptor} using the field names + * from the given {@link FieldsDescriptor} and the output type of the given {@link OTFAggregator}. + * + * @param inputFieldsDescriptor The {@link FieldsDescriptor} from which to derive the field names used + * for the output fields descriptor. + * @param otfAggregator The {@link OTFAggregator} to use for creating the output {@link FieldsDescriptor}. + * @return The output {@link FieldsDescriptor}. + */ + public static FieldsDescriptor getOutputFieldsDescriptor(FieldsDescriptor inputFieldsDescriptor, + OTFAggregator otfAggregator) + { + Map<String, Type> fieldToType = Maps.newHashMap(); + + for (Map.Entry<String, Type> entry : + inputFieldsDescriptor.getFieldToType().entrySet()) { + String fieldName = entry.getKey(); + Type outputType = otfAggregator.getOutputType(); + fieldToType.put(fieldName, outputType); + } + + return new FieldsDescriptor(fieldToType); + } + + /** + * This is a utility method which creates an output {@link FieldsDescriptor} from the + * given field names and the given {@link OTFAggregator}. + * + * @param fields The names of the fields to be included in the output {@link FieldsDescriptor}. + * @param otfAggregator The {@link OTFAggregator} to use when creating the output {@link FieldsDescriptor}. + * @return The output {@link FieldsDescriptor}. + */ + public static FieldsDescriptor getOutputFieldsDescriptor(Fields fields, + OTFAggregator otfAggregator) + { + Map<String, Type> fieldToType = Maps.newHashMap(); + + for (String field : fields.getFields()) { + fieldToType.put(field, otfAggregator.getOutputType()); + } + + return new FieldsDescriptor(fieldToType); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java new file mode 100644 index 0000000..2825e0a --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; +import com.datatorrent.lib.dimensions.DimensionsConversionContext; +import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; +import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; +import com.datatorrent.lib.dimensions.aggregator.AggregateEvent.Aggregator; + +/** + * <p> + * {@link IncrementalAggregator}s perform aggregations in place, on a field by field basis. For example if we have a + * field cost, an incremental aggregator would take a new value of cost and aggregate it to an aggregate value for + * cost. No fields except the cost field are used in the computation of the cost aggregation in the case of an + * {@link IncrementalAggregator}. + * </p> + * <p> + * {@link IncrementalAggregator}s are intended to be used with subclasses of + * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema}. The way in which + * {@link IncrementalAggregator}s are used in this context is that a batch of fields to be aggregated by the aggregator + * are provided in the form of an {@link InputEvent}. For example, if there are two fields (cost and revenue), which + * will be aggregated by a sum aggregator, both of those fields will be included in the {@link InputEvent} passed to + * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields, + * one for cost and one for revenue. + * </p> + * + */ +public interface IncrementalAggregator extends Aggregator<InputEvent, Aggregate> +{ + /** + * This method defines the type mapping for the {@link IncrementalAggregator}. The type mapping defines the + * relationship between the type of an input field and the type of its aggregate. For example if the aggregator takes + * a field of type int and produces an aggregate of type float, then this method would return a type of float when + * the given input type is an int. + * @param inputType The type of a field to be aggregate. + * @return The type of the aggregate corresponding to an input field of the given type. + */ + public Type getOutputType(Type inputType); + + /** + * This sets + */ + public void setDimensionsConversionContext(DimensionsConversionContext context); + + /** + * Returns a {@link FieldsDescriptor} object which describes the meta data that is stored along with aggregations. + * This method returns null if this aggregator stores no metadata. + * @return A {@link FieldsDescriptor} object which describes the meta data that is stored along with aggregations. + * This method returns null if this aggregator stores no metadata. + */ + public FieldsDescriptor getMetaDataDescriptor(); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java new file mode 100644 index 0000000..e5d8638 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import java.io.Serializable; + +import java.util.List; + +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * <p> + * This interface represents an On The Fly Aggregator. On the fly aggregators represent a class + * of aggregations which use the results of incremental aggregators, which implement the + * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator} interface. An example of an aggregation which + * needs to be performed on the fly is average. Average needs to be performed on the fly because average cannot be + * computed with just an existing average and a new data item, an average required the sum of all data items, and the + * count of all data items. An example implementation of average is {@link AggregatorAverage}. Also note + * that unlike {@link IncrementalAggregator}s an {@link OTFAggregator} only has one output type. This done + * because {@link OTFAggregator}s usually represent a very specific computation, with a specific output type. + * For example, average is a computation that you will almost always want to produce a double. But if you require + * an average operation that produces an integer, that could be done as a separate {@link OTFAggregator}. + * </p> + * <p> + * The primary usage for {@link OTFAggregator}s are in store operators which respond to queries. Currently, + * the only places which utilize {@link OTFAggregator}s are subclasses of the DimensionsStoreHDHT operator. + * </p> + * <p> + * This interface extends {@link Serializable} because On The Fly aggregators may be set + * as properties on some operators and operator properties are required to be java serializable. + * </p> + * @since 3.1.0 + */ +public interface OTFAggregator extends Serializable +{ + public static final long serialVersionUID = 201505251039L; + + /** + * This method returns all the incremental aggregators on which this aggregator depends on + * to compute its result. In the case of {@link AggregatorAverage} it's child aggregators are + * {@link AggregatorCount} and {@link AggregatorSum}. + * @return All the incremental aggregators on which this aggregator depends on to compute its + * result. + */ + + public List<Class<? extends IncrementalAggregator>> getChildAggregators(); + /** + * This method performs an on the fly aggregation from the given aggregates. The aggregates + * provided to this aggregator are each the result of one of this aggregators child aggregators. + * The order in which the aggregates are passed to this method is the same as the order in + * which the child aggregators are listed in the result of the {@link #getChildAggregators} method. + * Also note that this aggregator does not aggregate one field at a time. This aggregator recieves + * a batch of fields from each child aggregator, and the result of the method is also a batch of fields. + * @param aggregates These are the results of all the child aggregators. The results are in the same + * order as the child aggregators specified in the result of the {@link #getChildAggregators} method. + * @return The result of the on the fly aggregation. + */ + + public GPOMutable aggregate(GPOMutable... aggregates); + /** + * Returns the output type of the {@link OTFAggregator}. <b>Note<b> that any combination of input types + * will produce the same output type for {@link OTFAggregator}s. + * @return The output type of the {@link OTFAggregator}. + */ + + public Type getOutputType(); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java b/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java new file mode 100644 index 0000000..67f8c76 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.appdata.dimensions; + + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; +import com.datatorrent.lib.appdata.schemas.TimeBucket; +import com.datatorrent.lib.dimensions.CustomTimeBucketRegistry; + +public class CustomTimeBucketRegistryTest +{ + @Test + public void testBuildingRegistry() + { + CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry(); + + CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE); + CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR); + CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY); + + timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal()); + timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal()); + timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal()); + + CustomTimeBucket customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.MINUTE.ordinal()); + Assert.assertTrue(customTimeBucket.isUnit()); + Assert.assertEquals(TimeBucket.MINUTE, customTimeBucket.getTimeBucket()); + + customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.HOUR.ordinal()); + Assert.assertTrue(customTimeBucket.isUnit()); + Assert.assertEquals(TimeBucket.HOUR, customTimeBucket.getTimeBucket()); + + customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.DAY.ordinal()); + Assert.assertTrue(customTimeBucket.isUnit()); + Assert.assertEquals(TimeBucket.DAY, customTimeBucket.getTimeBucket()); + + Assert.assertEquals(TimeBucket.MINUTE.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1m)); + Assert.assertEquals(TimeBucket.HOUR.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1h)); + Assert.assertEquals(TimeBucket.DAY.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1d)); + } + + @Test + public void testRegister() + { + CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry(); + + CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE); + CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR); + CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY); + + timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal()); + timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal()); + timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal()); + + int max = Integer.MIN_VALUE; + max = Math.max(max, TimeBucket.MINUTE.ordinal()); + max = Math.max(max, TimeBucket.HOUR.ordinal()); + max = Math.max(max, TimeBucket.DAY.ordinal()); + + CustomTimeBucket c5m = new CustomTimeBucket(TimeBucket.MINUTE, 5L); + + timeBucketRegistry.register(c5m); + int timeBucketId = timeBucketRegistry.getTimeBucketId(c5m); + + Assert.assertEquals(max + 1, timeBucketId); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java b/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java new file mode 100644 index 0000000..43b71ef --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.appdata.dimensions; + +import java.util.Map; + +import com.google.common.collect.Maps; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; +import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey; + +public class DimensionsEventTest +{ + @Test + public void eventKeyEqualsHashCodeTest() + { + Map<String, Type> fieldToTypeA = Maps.newHashMap(); + fieldToTypeA.put("a", Type.LONG); + fieldToTypeA.put("b", Type.STRING); + + FieldsDescriptor fdA = new FieldsDescriptor(fieldToTypeA); + + GPOMutable gpoA = new GPOMutable(fdA); + gpoA.setField("a", 1L); + gpoA.setField("b", "hello"); + + EventKey eventKeyA = new EventKey(1, 1, 1, gpoA); + + Map<String, Type> fieldToTypeB = Maps.newHashMap(); + fieldToTypeB.put("a", Type.LONG); + fieldToTypeB.put("b", Type.STRING); + + FieldsDescriptor fdB = new FieldsDescriptor(fieldToTypeB); + + GPOMutable gpoB = new GPOMutable(fdB); + gpoB.setField("a", 1L); + gpoB.setField("b", "hello"); + + EventKey eventKeyB = new EventKey(1, 1, 1, gpoB); + + Assert.assertEquals("The two hashcodes should equal", eventKeyA.hashCode(), eventKeyB.hashCode()); + Assert.assertEquals("The two event keys should equal", eventKeyA, eventKeyB); + } +}
