http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptor.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptor.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptor.java new file mode 100644 index 0000000..f7df583 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptor.java @@ -0,0 +1,447 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; +import com.datatorrent.lib.appdata.schemas.Fields; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.TimeBucket; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * <p> + * This class defines a dimensions combination which is used by dimensions + * computation operators and stores. A dimension combination is composed of the + * names of the fields that constitute the key, as well as the TimeBucket under + * which data is stored. + * </p> + * <p> + * This class supports the creation of a dimensions combination from a + * {@link TimeBucket} object and a set of fields. It also supports the creation + * of a dimensions combination an aggregation string. An aggregation string + * looks like the following: <br/> + * <br/> + * {@code + * "time=MINUTES:publisher:advertiser" + * } <br/> + * <br/> + * In the example above <b>"time=MINUTES"</b> represents a time bucket, and the + * other colon separated strings represent the name of fields which comprise the + * key for this dimension combination. When specifiying a time bucket in an + * aggregation string you must use the name of one of the TimeUnit enums. + * </p> + * <p> + * One of the primary uses of a {@link DimensionsDescriptor} is for querying a + * dimensional data store. When a query is received for a dimensional data + * store, the query must be mapped to many things including a + * dimensionDescriptorID. The dimensionDescriptorID is an id assigned to a class + * of dimension combinations which share the same keys. This mapping is + * performed by creating a {@link DimensionsDescriptor} object from the query, + * and then using the {@link DimensionsDescriptor} object to look up the correct + * dimensionsDescriptorID. This lookup to retrieve a dimensionsDescriptorID is + * necessary because a dimensionsDescriptorID is used for storage in order to + * prevent key conflicts. + * </p> + * + * + * @since 3.3.0 + */ +public class DimensionsDescriptor implements Serializable, Comparable<DimensionsDescriptor> +{ + private static final long serialVersionUID = 201506251237L; + + /** + * Name of the reserved time field. + */ + public static final String DIMENSION_TIME = "time"; + /** + * Type of the reserved time field. + */ + public static final Type DIMENSION_TIME_TYPE = Type.LONG; + /** + * Name of the reserved time bucket field. + */ + public static final String DIMENSION_TIME_BUCKET = "timeBucket"; + /** + * Type of the reserved time bucket field. + */ + public static final Type DIMENSION_TIME_BUCKET_TYPE = Type.INTEGER; + /** + * The set of fields used for time, which are intended to be queried. Not that + * the timeBucket field is not included here because its not intended to be + * queried. + */ + public static final Fields TIME_FIELDS = new Fields(Sets.newHashSet(DIMENSION_TIME)); + /** + * This set represents the field names which cannot be part of the user + * defined field names in a schema for dimensions computation. + */ + public static final Set<String> RESERVED_DIMENSION_NAMES = ImmutableSet.of(DIMENSION_TIME, DIMENSION_TIME_BUCKET); + /** + * This is the equals string separator used when defining a time bucket for a + * dimensions combination. + */ + public static final String DELIMETER_EQUALS = "="; + /** + * This separates dimensions in the dimensions combination. + */ + public static final String DELIMETER_SEPERATOR = ":"; + /** + * A map from a key field to its type. + */ + public static final Map<String, Type> DIMENSION_FIELD_TO_TYPE; + + /** + * The time bucket used for this dimension combination. + */ + private TimeBucket timeBucket; + /** + * The custom time bucket used for this dimension combination. + */ + private CustomTimeBucket customTimeBucket; + /** + * The set of key fields which compose this dimension combination. + */ + private Fields fields; + + static { + Map<String, Type> dimensionFieldToType = Maps.newHashMap(); + + dimensionFieldToType.put(DIMENSION_TIME, DIMENSION_TIME_TYPE); + dimensionFieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE); + + DIMENSION_FIELD_TO_TYPE = Collections.unmodifiableMap(dimensionFieldToType); + } + + /** + * Constructor for kryo serialization. + */ + private DimensionsDescriptor() + { + //for kryo + } + + /** + * Creates a dimensions descriptor (dimensions combination) with the given + * {@link TimeBucket} and key fields. + * + * @param timeBucket + * The {@link TimeBucket} that this dimensions combination + * represents. + * @param fields + * The key fields included in this dimensions combination. + * @deprecated use + * {@link #DimensionsDescriptor(com.datatorrent.lib.appdata.schemas.CustomTimeBucket, com.datatorrent.lib.appdata.schemas.Fields)} + * instead. + */ + @Deprecated + public DimensionsDescriptor(TimeBucket timeBucket, Fields fields) + { + setTimeBucket(timeBucket); + setFields(fields); + } + + /** + * Creates a dimensions descriptor (dimensions combination) with the given + * {@link CustomTimeBucket} and key fields. + * + * @param timeBucket + * The {@link CustomTimeBucket} that this dimensions combination + * represents. + * @param fields + * The key fields included in this dimensions combination. + */ + public DimensionsDescriptor(CustomTimeBucket timeBucket, Fields fields) + { + setCustomTimeBucket(timeBucket); + setFields(fields); + } + + /** + * Creates a dimensions descriptor (dimensions combination) with the given key + * fields. + * + * @param fields + * The key fields included in this dimensions combination. + */ + public DimensionsDescriptor(Fields fields) + { + setFields(fields); + } + + /** + * This construction creates a dimensions descriptor (dimensions combination) + * from the given aggregation string. + * + * @param aggregationString + * The aggregation string to use when initializing this dimensions + * combination. + */ + public DimensionsDescriptor(String aggregationString) + { + initialize(aggregationString); + } + + /** + * Initializes the dimensions combination with the given aggregation string. + * + * @param aggregationString + * The aggregation string with which to initialize this dimensions + * combination. + */ + private void initialize(String aggregationString) + { + String[] fieldArray = aggregationString.split(DELIMETER_SEPERATOR); + Set<String> fieldSet = Sets.newHashSet(); + + for (String field : fieldArray) { + String[] fieldAndValue = field.split(DELIMETER_EQUALS); + String fieldName = fieldAndValue[0]; + + if (fieldName.equals(DIMENSION_TIME_BUCKET)) { + throw new IllegalArgumentException(DIMENSION_TIME_BUCKET + " is an invalid time."); + } + + if (!fieldName.equals(DIMENSION_TIME)) { + fieldSet.add(fieldName); + } + + if (fieldName.equals(DIMENSION_TIME)) { + if (timeBucket != null) { + throw new IllegalArgumentException( + "Cannot specify time in a dimensions " + "descriptor when a timebucket is also " + "specified."); + } + + if (fieldAndValue.length == 2) { + + timeBucket = TimeBucket.TIME_UNIT_TO_TIME_BUCKET.get(TimeUnit.valueOf(fieldAndValue[1])); + } + } + } + + fields = new Fields(fieldSet); + } + + /** + * This is a helper method which sets and validates the {@link TimeBucket}. + * + * @param timeBucket + * The {@link TimeBucket} to set and validate. + */ + private void setTimeBucket(TimeBucket timeBucket) + { + Preconditions.checkNotNull(timeBucket); + this.timeBucket = timeBucket; + this.customTimeBucket = new CustomTimeBucket(timeBucket); + } + + /** + * This is a helper method which sets and validates the + * {@link CustomTimeBucket}. + * + * @param customTimeBucket + * The {@link CustomTimeBucket} to set and validate. + */ + private void setCustomTimeBucket(CustomTimeBucket customTimeBucket) + { + Preconditions.checkNotNull(customTimeBucket); + this.customTimeBucket = customTimeBucket; + this.timeBucket = customTimeBucket.getTimeBucket(); + } + + /** + * Gets the {@link TimeBucket} for this {@link DimensionsDescriptor} object. + * + * @return The {@link TimeBucket} for this {@link DimensionsDescriptor} + * object. + * @deprecated use {@link #getCustomTimeBucket()} instead. + */ + @Deprecated + public TimeBucket getTimeBucket() + { + return timeBucket; + } + + /** + * Gets the {@link CustomTimeBucket} for this {@link DimensionsDescriptor} + * object. + * + * @return The {@link CustomTimeBucket} for this {@link DimensionsDescriptor} + * object. + */ + public CustomTimeBucket getCustomTimeBucket() + { + return customTimeBucket; + } + + /** + * This is a helper method which sets and validates the set of key fields for + * this {@link DimensionsDescriptor} object. + * + * @param fields + * The set of key fields for this {@link DimensionsDescriptor} + * object. + */ + private void setFields(Fields fields) + { + Preconditions.checkNotNull(fields); + this.fields = fields; + } + + /** + * Returns the set of key fields for this {@link DimensionsDescriptor} object. + * + * @return The set of key fields for this {@link DimensionsDescriptor} object. + */ + public Fields getFields() + { + return fields; + } + + /** + * This method is used to create a new {@link FieldsDescriptor} object + * representing this {@link DimensionsDescriptor} object from another + * {@link FieldsDescriptor} object which defines the names and types of all + * the available key fields. + * + * @param parentDescriptor + * The {@link FieldsDescriptor} object which defines the name and + * type of all the available key fields. + * @return A {@link FieldsDescriptor} object which represents this + * {@link DimensionsDescriptor} (dimensions combination) derived from + * the given {@link FieldsDescriptor} object. + */ + public FieldsDescriptor createFieldsDescriptor(FieldsDescriptor parentDescriptor) + { + Map<String, Type> fieldToType = Maps.newHashMap(); + Map<String, Type> parentFieldToType = parentDescriptor.getFieldToType(); + + for (String field : this.fields.getFields()) { + if (RESERVED_DIMENSION_NAMES.contains(field)) { + continue; + } + + fieldToType.put(field, parentFieldToType.get(field)); + } + + if (timeBucket != null && timeBucket != TimeBucket.ALL) { + fieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE); + fieldToType.put(DIMENSION_TIME, Type.LONG); + } + + return new FieldsDescriptor(fieldToType); + } + + @Override + public int hashCode() + { + int hash = 7; + hash = 83 * hash + (this.customTimeBucket != null ? this.customTimeBucket.hashCode() : 0); + hash = 83 * hash + (this.fields != null ? this.fields.hashCode() : 0); + return hash; + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final DimensionsDescriptor other = (DimensionsDescriptor)obj; + if (!this.customTimeBucket.equals(other.customTimeBucket)) { + return false; + } + if (this.fields != other.fields && (this.fields == null || !this.fields.equals(other.fields))) { + return false; + } + return true; + } + + @Override + public String toString() + { + return "DimensionsDescriptor{" + "timeBucket=" + customTimeBucket + ", fields=" + fields + '}'; + } + + @Override + public int compareTo(DimensionsDescriptor other) + { + if (this == other) { + return 0; + } + + List<String> thisFieldList = this.getFields().getFieldsList(); + List<String> otherFieldList = other.getFields().getFieldsList(); + + if (thisFieldList != otherFieldList) { + int compare = thisFieldList.size() - otherFieldList.size(); + + if (compare != 0) { + return compare; + } + + Collections.sort(thisFieldList); + Collections.sort(otherFieldList); + + for (int index = 0; index < thisFieldList.size(); index++) { + String thisField = thisFieldList.get(index); + String otherField = otherFieldList.get(index); + + int fieldCompare = thisField.compareTo(otherField); + + if (fieldCompare != 0) { + return fieldCompare; + } + } + } + + CustomTimeBucket thisBucket = this.getCustomTimeBucket(); + CustomTimeBucket otherBucket = other.getCustomTimeBucket(); + + if (thisBucket == null && otherBucket == null) { + return 0; + } else if (thisBucket != null && otherBucket == null) { + return 1; + } else if (thisBucket == null && otherBucket != null) { + return -1; + } else { + return thisBucket.compareTo(otherBucket); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(DimensionsDescriptor.class); +}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsEvent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsEvent.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsEvent.java new file mode 100644 index 0000000..de9e096 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsEvent.java @@ -0,0 +1,848 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions; + +import java.io.Serializable; +import java.util.List; + +import org.apache.apex.malhar.lib.dimensions.aggregator.AggregateEvent; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; + +/** + * <p> + * This is the base class for the events that are used for internal processing + * in the subclasses of {@link AbstractDimensionsComputationFlexible} and + * {@link DimensionsStoreHDHT}. + * </p> + * <p> + * A {@link DimensionsEvent} is constructed from two parts: an {@link EventKey} + * and a {@link GPOMutable} object which contains the values of aggregate + * fields. The {@link EventKey} is used to identify the dimension combination an + * event belongs to, and consequently determines what input values should be + * aggregated together. The aggregates are the actual data payload of the event + * which are to be aggregated. + * </p> + * + * @since 3.1.0 + */ +public class DimensionsEvent implements Serializable +{ + private static final long serialVersionUID = 201503231204L; + + /** + * This is the {@link GPOMutable} object which holds all the aggregates. + */ + protected GPOMutable aggregates; + /** + * This is the event key for the event. + */ + protected EventKey eventKey; + + /** + * Constructor for Kryo. + */ + private DimensionsEvent() + { + //For kryo + } + + /** + * This creates a {@link DimensionsEvent} from the given event key and + * aggregates. + * + * @param eventKey + * The key from which to create a {@link DimensionsEvent}. + * @param aggregates + * The aggregates from which to create {@link DimensionsEvent}. + */ + public DimensionsEvent(EventKey eventKey, GPOMutable aggregates) + { + setEventKey(eventKey); + setAggregates(aggregates); + } + + /** + * Creates a DimensionsEvent with the given key values, aggregates and ids. + * + * @param keys + * The values for fields in the key. + * @param aggregates + * The values for fields in the aggregate. + * @param bucketID + * The bucketID + * @param schemaID + * The schemaID. + * @param dimensionDescriptorID + * The dimensionsDescriptorID. + * @param aggregatorIndex + * The aggregatorIndex assigned to this event by the unifier. + */ + public DimensionsEvent(GPOMutable keys, GPOMutable aggregates, int bucketID, int schemaID, int dimensionDescriptorID, + int aggregatorIndex) + { + this.eventKey = new EventKey(bucketID, schemaID, dimensionDescriptorID, aggregatorIndex, keys); + setAggregates(aggregates); + } + + /** + * This creates an event with the given data. Note, this constructor assumes + * that the bucketID will be 0. + * + * @param keys + * The value for fields in the key. + * @param aggregates + * The value for fields in the aggregate. + * @param schemaID + * The schemaID. + * @param dimensionDescriptorID + * The dimensionsDescriptorID. + * @param aggregatorIndex + * The aggregatorIndex assigned to this event by the unifier. + */ + public DimensionsEvent(GPOMutable keys, GPOMutable aggregates, int schemaID, int dimensionDescriptorID, + int aggregatorIndex) + { + this.eventKey = new EventKey(schemaID, dimensionDescriptorID, aggregatorIndex, keys); + setAggregates(aggregates); + } + + /** + * This is a helper method which sets the {@link EventKey} of the event to be + * the same as the given {@link EventKey}. + * + * @param eventKey + * The {@link EventKey} to set on this event. + */ + protected final void setEventKey(EventKey eventKey) + { + this.eventKey = new EventKey(eventKey); + } + + /** + * This is a helper method which sets the aggregates for this event. + * + * @param aggregates + * The aggregates for this event. + */ + protected final void setAggregates(GPOMutable aggregates) + { + Preconditions.checkNotNull(aggregates); + this.aggregates = aggregates; + } + + /** + * This is a helper method which returns the aggregates for this event. + * + * @return The helper method which returns the aggregates for this event. + */ + public GPOMutable getAggregates() + { + return aggregates; + } + + /** + * Returns the {@link EventKey} for this event. + * + * @return The {@link EventKey} for this event. + */ + public EventKey getEventKey() + { + return eventKey; + } + + /** + * This is a convenience method which returns the values of the key fields in + * this event's {@link EventKey}. + * + * @return The values of the key fields in this event's {@link EventKey}. + */ + public GPOMutable getKeys() + { + return eventKey.getKey(); + } + + /** + * This is a convenience method which returns the schemaID of this event's + * {@link EventKey}. + * + * @return The schemaID of this event's {@link EventKey}. + */ + public int getSchemaID() + { + return eventKey.getSchemaID(); + } + + /** + * Returns the id of the dimension descriptor (key combination) for which this + * event contains data. + * + * @return The id of the dimension descriptor (key combination) for which this + * event contains data. + */ + public int getDimensionDescriptorID() + { + return eventKey.getDimensionDescriptorID(); + } + + /** + * Returns the id of the aggregator which is applied to this event's data. + * + * @return Returns the id of the aggregator which is applied to this event's + * data. + */ + public int getAggregatorID() + { + return eventKey.getAggregatorID(); + } + + /** + * Returns the bucketID assigned to this event. The bucketID is useful for + * this event in the case that the event is sent to a partitioned HDHT + * operator. Each partitioned HDHT operator can use the bucketIDs for the + * buckets it writes to as a partition key. + * + * @return The bucketID assigned to this event. + */ + public int getBucketID() + { + return eventKey.getBucketID(); + } + + /** + * This is a utility method which copies the given src event to the given + * destination event. + * + * @param aeDest + * The destination event. + * @param aeSrc + * The source event. + */ + public static void copy(DimensionsEvent aeDest, DimensionsEvent aeSrc) + { + GPOMutable destAggs = aeDest.getAggregates(); + GPOMutable srcAggs = aeSrc.getAggregates(); + + if (srcAggs.getFieldsBoolean() != null) { + System.arraycopy(srcAggs.getFieldsBoolean(), 0, destAggs.getFieldsBoolean(), 0, + srcAggs.getFieldsBoolean().length); + } + + if (srcAggs.getFieldsCharacter() != null) { + System.arraycopy(srcAggs.getFieldsCharacter(), 0, destAggs.getFieldsCharacter(), 0, + srcAggs.getFieldsCharacter().length); + } + + if (srcAggs.getFieldsString() != null) { + System.arraycopy(srcAggs.getFieldsString(), 0, destAggs.getFieldsString(), 0, srcAggs.getFieldsString().length); + } + + if (srcAggs.getFieldsShort() != null) { + System.arraycopy(srcAggs.getFieldsShort(), 0, destAggs.getFieldsShort(), 0, srcAggs.getFieldsShort().length); + } + + if (srcAggs.getFieldsInteger() != null) { + System.arraycopy(srcAggs.getFieldsInteger(), 0, destAggs.getFieldsInteger(), 0, + srcAggs.getFieldsInteger().length); + } + + if (srcAggs.getFieldsLong() != null) { + System.arraycopy(srcAggs.getFieldsLong(), 0, destAggs.getFieldsLong(), 0, srcAggs.getFieldsLong().length); + } + + if (srcAggs.getFieldsFloat() != null) { + System.arraycopy(srcAggs.getFieldsFloat(), 0, destAggs.getFieldsFloat(), 0, srcAggs.getFieldsFloat().length); + } + + if (srcAggs.getFieldsDouble() != null) { + System.arraycopy(srcAggs.getFieldsDouble(), 0, destAggs.getFieldsDouble(), 0, srcAggs.getFieldsDouble().length); + } + } + + /** + * <p> + * The {@link EventKey} represents a dimensions combination for a dimensions + * event. It contains the keys and values which define a dimensions + * combination. It's very similar to a {@link DimensionsDescriptor} which is + * also used to define part of a dimensions combination. The difference + * between the two is that a {@link DimensionsDescriptor} only contains what + * keys are included in the combination, not the values of those keys (which + * the {@link EventKey} has. + * </p> + * <p> + * In addition to holding the keys in a dimensions combination and their + * values, the event key holds some meta information. The meta information + * included and their purposes are the following: + * <ul> + * <li><b>bucketID:</b> This is set when the dimension store responsible for + * storing the data is partitioned. In that case the bucketID is used as the + * partitionID.</li> + * <li><b>schemaID:</b> This is the id of the {@link DimensionalSchema} that + * this {@link EventKey} corresponds to .</li> + * <li><b>dimensionDescriptorID:</b> This is the id of the + * {@link DimensionsDescriptor} that this {@link EventKey} corresponds to. + * </li> + * <li><b>aggregatorID:</b> This is the id of the aggregator that is used to + * aggregate the values associated with this {@link EventKey} in a + * {@link DimensionsEvent}.</li> + * </ul> + * </p> + */ + public static class EventKey implements Serializable + { + private static final long serialVersionUID = 201503231205L; + + /** + * The bucketID assigned to this event key. + */ + private int bucketID; + /** + * The schemaID corresponding to the {@link DimensionalSchema} that this + * {@link EventKey} corresponds to. + */ + private int schemaID; + /** + * The dimensionsDescriptorID of the {@link DimensionDescriptor} in the + * corresponding {@link DimensionalSchema}. + */ + private int dimensionDescriptorID; + /** + * The id of the aggregator which should be used to aggregate the values + * corresponding to this {@link EventKey}. + */ + private int aggregatorID; + /** + * The values of the key fields. + */ + private GPOMutable key; + + /** + * Constructor for serialization. + */ + private EventKey() + { + //For kryo + } + + /** + * Copy constructor. + * + * @param eventKey + * The {@link EventKey} whose data will be copied. + */ + public EventKey(EventKey eventKey) + { + this.bucketID = eventKey.bucketID; + this.schemaID = eventKey.schemaID; + this.dimensionDescriptorID = eventKey.dimensionDescriptorID; + this.aggregatorID = eventKey.aggregatorID; + + this.key = new GPOMutable(eventKey.getKey()); + } + + /** + * Creates an event key with the given data. + * + * @param bucketID + * The bucketID assigned to this {@link EventKey}. + * @param schemaID + * The schemaID of the corresponding {@link DimensionalSchema}. + * @param dimensionDescriptorID + * The dimensionDescriptorID of the corresponding + * {@link DimensionDescriptor} in the {@link DimensionalSchema}. + * @param aggregatorID + * The id of the aggregator which should be used to aggregate the + * values corresponding to this {@link EventKey}. + * @param key + * The values of the keys. + */ + public EventKey(int bucketID, int schemaID, int dimensionDescriptorID, int aggregatorID, GPOMutable key) + { + setBucketID(bucketID); + setSchemaID(schemaID); + setDimensionDescriptorID(dimensionDescriptorID); + setAggregatorID(aggregatorID); + setKey(key); + } + + /** + * Creates an event key with the given data. This constructor assumes that + * the bucketID will be 0. + * + * @param schemaID + * The schemaID of the corresponding {@link DimensionalSchema}. + * @param dimensionDescriptorID + * The dimensionDescriptorID of the corresponding + * {@link DimensionDescriptor}. + * @param aggregatorID + * The id of the aggregator which should be used to aggregate the + * values corresponding to this {@link EventKey}. + * @param key + * The values of the keys. + */ + public EventKey(int schemaID, int dimensionDescriptorID, int aggregatorID, GPOMutable key) + { + setSchemaID(schemaID); + setDimensionDescriptorID(dimensionDescriptorID); + setAggregatorID(aggregatorID); + setKey(key); + } + + /** + * Sets the dimension descriptor ID. + * + * @param dimensionDescriptorID + * The dimension descriptor ID to set. + */ + private void setDimensionDescriptorID(int dimensionDescriptorID) + { + this.dimensionDescriptorID = dimensionDescriptorID; + } + + /** + * Returns the dimension descriptor ID. + * + * @return The dimension descriptor ID. + */ + public int getDimensionDescriptorID() + { + return dimensionDescriptorID; + } + + /** + * Returns the aggregatorID. + * + * @return The aggregatorID. + */ + public int getAggregatorID() + { + return aggregatorID; + } + + /** + * Sets the aggregatorID. + * + * @param aggregatorID + * The aggregatorID to set. + */ + private void setAggregatorID(int aggregatorID) + { + this.aggregatorID = aggregatorID; + } + + /** + * Returns the schemaID. + * + * @return The schemaID to set. + */ + public int getSchemaID() + { + return schemaID; + } + + /** + * Sets the schemaID. + * + * @param schemaID + * The schemaID to set. + */ + private void setSchemaID(int schemaID) + { + this.schemaID = schemaID; + } + + /** + * Returns the key values. + * + * @return The key values. + */ + public GPOMutable getKey() + { + return key; + } + + /** + * Sets the bucektID. + * + * @param bucketID + * The bucketID. + */ + private void setBucketID(int bucketID) + { + this.bucketID = bucketID; + } + + /** + * Gets the bucketID. + * + * @return The bucketID. + */ + public int getBucketID() + { + return bucketID; + } + + /** + * Sets the key values. + * + * @param key + * The key values to set. + */ + private void setKey(GPOMutable key) + { + Preconditions.checkNotNull(key); + this.key = key; + } + + @Override + public int hashCode() + { + int hash = 3; + hash = 97 * hash + this.bucketID; + hash = 97 * hash + this.schemaID; + hash = 97 * hash + this.dimensionDescriptorID; + hash = 97 * hash + this.aggregatorID; + hash = 97 * hash + (this.key != null ? this.key.hashCode() : 0); + return hash; + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + final EventKey other = (EventKey)obj; + + if (this.bucketID != other.bucketID) { + return false; + } + + if (this.schemaID != other.schemaID) { + return false; + } + + if (this.dimensionDescriptorID != other.dimensionDescriptorID) { + return false; + } + + if (this.aggregatorID != other.aggregatorID) { + return false; + } + + if (this.key != other.key && (this.key == null || !this.key.equals(other.key))) { + return false; + } + + return true; + } + + @Override + public String toString() + { + return "EventKey{" + "schemaID=" + schemaID + ", dimensionDescriptorID=" + dimensionDescriptorID + + ", aggregatorIndex=" + aggregatorID + ", key=" + key + '}'; + } + + public static List<EventKey> createEventKeys(int schemaId, int dimensionsDescriptorId, int aggregatorId, + List<GPOMutable> keys) + { + List<EventKey> eventKeys = Lists.newArrayList(); + + for (GPOMutable key : keys) { + eventKeys.add(new EventKey(schemaId, dimensionsDescriptorId, aggregatorId, key)); + } + + return eventKeys; + } + } + + @Override + public int hashCode() + { + int hash = 5; + hash = 79 * hash + (this.aggregates != null ? this.aggregates.hashCode() : 0); + hash = 79 * hash + (this.eventKey != null ? this.eventKey.hashCode() : 0); + return hash; + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final DimensionsEvent other = (DimensionsEvent)obj; + if (this.aggregates != other.aggregates && (this.aggregates == null || !this.aggregates.equals(other.aggregates))) { + return false; + } + if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) { + return false; + } + return true; + } + + public static class InputEvent extends DimensionsEvent + { + private static final long serialVersionUID = 201506210406L; + public boolean used = false; + + private InputEvent() + { + } + + /** + * This creates a {@link DimensionsEvent} from the given event key and + * aggregates. + * + * @param eventKey + * The key from which to create a {@link DimensionsEvent}. + * @param aggregates + * The aggregates from which to create {@link DimensionsEvent}. + */ + public InputEvent(EventKey eventKey, GPOMutable aggregates) + { + setEventKey(eventKey); + setAggregates(aggregates); + } + + /** + * Creates a DimensionsEvent with the given key values, aggregates and ids. + * + * @param keys + * The values for fields in the key. + * @param aggregates + * The values for fields in the aggregate. + * @param bucketID + * The bucketID + * @param schemaID + * The schemaID. + * @param dimensionDescriptorID + * The dimensionsDescriptorID. + * @param aggregatorIndex + * The aggregatorIndex assigned to this event by the unifier. + */ + public InputEvent(GPOMutable keys, GPOMutable aggregates, int bucketID, int schemaID, int dimensionDescriptorID, + int aggregatorIndex) + { + this.eventKey = new EventKey(bucketID, schemaID, dimensionDescriptorID, aggregatorIndex, keys); + setAggregates(aggregates); + } + + /** + * This creates an event with the given data. Note, this constructor assumes + * that the bucketID will be 0. + * + * @param keys + * The value for fields in the key. + * @param aggregates + * The value for fields in the aggregate. + * @param schemaID + * The schemaID. + * @param dimensionDescriptorID + * The dimensionsDescriptorID. + * @param aggregatorIndex + * The aggregatorIndex assigned to this event by the unifier. + */ + public InputEvent(GPOMutable keys, GPOMutable aggregates, int schemaID, int dimensionDescriptorID, + int aggregatorIndex) + { + this.eventKey = new EventKey(schemaID, dimensionDescriptorID, aggregatorIndex, keys); + setAggregates(aggregates); + } + + @Override + public int hashCode() + { + return GPOUtils.hashcode(this.getKeys()); + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final DimensionsEvent other = (DimensionsEvent)obj; + + if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) { + return false; + } + return true; + } + } + + public static class Aggregate extends DimensionsEvent implements AggregateEvent + { + private static final long serialVersionUID = 201506190110L; + + /** + * This is the aggregatorIndex assigned to this event. + */ + protected int aggregatorIndex; + private GPOMutable metaData; + + public Aggregate() + { + //for kryo and for extending classes + } + + /** + * This creates a {@link DimensionsEvent} from the given event key and + * aggregates. + * + * @param eventKey + * The key from which to create a {@link DimensionsEvent}. + * @param aggregates + * The aggregates from which to create {@link DimensionsEvent}. + */ + public Aggregate(EventKey eventKey, GPOMutable aggregates) + { + setEventKey(eventKey); + setAggregates(aggregates); + } + + public Aggregate(EventKey eventKey, GPOMutable aggregates, GPOMutable metaData) + { + super(eventKey, aggregates); + + this.metaData = metaData; + } + + /** + * Creates a DimensionsEvent with the given key values, aggregates and ids. + * + * @param keys + * The values for fields in the key. + * @param aggregates + * The values for fields in the aggregate. + * @param bucketID + * The bucketID + * @param schemaID + * The schemaID. + * @param dimensionDescriptorID + * The dimensionsDescriptorID. + * @param aggregatorIndex + * The aggregatorIndex assigned to this event by the unifier. + */ + public Aggregate(GPOMutable keys, GPOMutable aggregates, int bucketID, int schemaID, int dimensionDescriptorID, + int aggregatorIndex) + { + this.eventKey = new EventKey(bucketID, schemaID, dimensionDescriptorID, aggregatorIndex, keys); + setAggregates(aggregates); + } + + public Aggregate(GPOMutable keys, GPOMutable aggregates, GPOMutable metaData, int bucketID, int schemaID, + int dimensionDescriptorID, int aggregatorIndex) + { + this(keys, aggregates, bucketID, schemaID, dimensionDescriptorID, aggregatorIndex); + + this.metaData = metaData; + } + + /** + * This creates an event with the given data. Note, this constructor assumes + * that the bucketID will be 0. + * + * @param keys + * The value for fields in the key. + * @param aggregates + * The value for fields in the aggregate. + * @param schemaID + * The schemaID. + * @param dimensionDescriptorID + * The dimensionsDescriptorID. + * @param aggregatorIndex + * The aggregatorIndex assigned to this event by the unifier. + */ + public Aggregate(GPOMutable keys, GPOMutable aggregates, int schemaID, int dimensionDescriptorID, + int aggregatorIndex) + { + this.eventKey = new EventKey(schemaID, dimensionDescriptorID, aggregatorIndex, keys); + setAggregates(aggregates); + } + + public Aggregate(GPOMutable keys, GPOMutable aggregates, GPOMutable metaData, int schemaID, + int dimensionDescriptorID, int aggregatorIndex) + { + this(keys, aggregates, schemaID, dimensionDescriptorID, aggregatorIndex); + + this.metaData = metaData; + } + + public void setMetaData(GPOMutable metaData) + { + this.metaData = metaData; + } + + public GPOMutable getMetaData() + { + return metaData; + } + + public void setAggregatorIndex(int aggregatorIndex) + { + this.aggregatorIndex = aggregatorIndex; + } + + @Override + public int getAggregatorIndex() + { + return aggregatorIndex; + } + + @Override + public int hashCode() + { + return GPOUtils.hashcode(this.getKeys()); + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final DimensionsEvent other = (DimensionsEvent)obj; + + if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) { + return false; + } + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java new file mode 100644 index 0000000..b25390e --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.dimensions.DimensionsConversionContext; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.EventKey; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent; + +import com.google.common.base.Preconditions; + +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; + +/** + * * <p> + * {@link IncrementalAggregator}s perform aggregations in place, on a field by field basis. For example if we have a + * field cost, an incremental aggregator would take a new value of cost and aggregate it to an aggregate value for + * cost. No fields except the cost field are used in the computation of the cost aggregation in the case of an + * {@link IncrementalAggregator}. + * </p> + * <p> + * {@link IncrementalAggregator}s are intended to be used with subclasses of + * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema}. The way in which + * {@link IncrementalAggregator}s are used in this context is that a batch of fields to be aggregated by the aggregator + * are provided in the form of an {@link InputEvent}. For example, if there are two fields (cost and revenue), which + * will be aggregated by a sum aggregator, both of those fields will be included in the {@link InputEvent} passed to + * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields, + * one for cost and one for revenue. + * </p> + * + */ +public abstract class AbstractIncrementalAggregator implements IncrementalAggregator +{ + private static final long serialVersionUID = 201506211153L; + + /** + * The conversion context for this aggregator. + */ + protected DimensionsConversionContext context; + + public AbstractIncrementalAggregator() + { + } + + @Override + public void setDimensionsConversionContext(DimensionsConversionContext context) + { + this.context = Preconditions.checkNotNull(context); + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + src.used = true; + Aggregate aggregate = createAggregate(src, + context, + aggregatorIndex); + return aggregate; + } + + @Override + public int hashCode(InputEvent inputEvent) + { + long timestamp = -1L; + boolean hasTime = this.context.inputTimestampIndex != -1 + && this.context.outputTimebucketIndex != -1; + + if (hasTime) { + timestamp = inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex]; + inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex] + = this.context.dd.getCustomTimeBucket().roundDown(timestamp); + } + + int hashCode = GPOUtils.indirectHashcode(inputEvent.getKeys(), context.indexSubsetKeys); + + if (hasTime) { + inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex] = timestamp; + } + + return hashCode; + } + + @Override + public boolean equals(InputEvent inputEvent1, InputEvent inputEvent2) + { + long timestamp1 = 0; + long timestamp2 = 0; + + if (context.inputTimestampIndex != -1) { + timestamp1 = inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex]; + inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] = + context.dd.getCustomTimeBucket().roundDown(timestamp1); + + timestamp2 = inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex]; + inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] = + context.dd.getCustomTimeBucket().roundDown(timestamp2); + } + + boolean equals = GPOUtils.subsetEquals(inputEvent2.getKeys(), + inputEvent1.getKeys(), + context.indexSubsetKeys); + + if (context.inputTimestampIndex != -1) { + inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp1; + inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp2; + } + + return equals; + } + + /** + * Creates an {@link Aggregate} from the given {@link InputEvent}. + * + * @param inputEvent The {@link InputEvent} to unpack into an {@link Aggregate}. + * @param context The conversion context required to transform the {@link InputEvent} into + * the correct {@link Aggregate}. + * @param aggregatorIndex The aggregatorIndex assigned to this {@link Aggregate}. + * @return The converted {@link Aggregate}. + */ + public static Aggregate createAggregate(InputEvent inputEvent, + DimensionsConversionContext context, + int aggregatorIndex) + { + GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor); + EventKey eventKey = createEventKey(inputEvent, + context, + aggregatorIndex); + + Aggregate aggregate = new Aggregate(eventKey, + aggregates); + aggregate.setAggregatorIndex(aggregatorIndex); + + return aggregate; + } + + /** + * Creates an {@link EventKey} from the given {@link InputEvent}. + * + * @param inputEvent The {@link InputEvent} to extract an {@link EventKey} from. + * @param context The conversion context required to extract the {@link EventKey} from + * the given {@link InputEvent}. + * @param aggregatorIndex The aggregatorIndex to assign to this {@link InputEvent}. + * @return The {@link EventKey} extracted from the given {@link InputEvent}. + */ + public static EventKey createEventKey(InputEvent inputEvent, + DimensionsConversionContext context, + int aggregatorIndex) + { + GPOMutable keys = new GPOMutable(context.keyDescriptor); + GPOUtils.indirectCopy(keys, inputEvent.getKeys(), context.indexSubsetKeys); + + if (context.outputTimebucketIndex >= 0) { + CustomTimeBucket timeBucket = context.dd.getCustomTimeBucket(); + + keys.getFieldsInteger()[context.outputTimebucketIndex] = context.customTimeBucketRegistry.getTimeBucketId( + timeBucket); + keys.getFieldsLong()[context.outputTimestampIndex] = + timeBucket.roundDown(inputEvent.getKeys().getFieldsLong()[context.inputTimestampIndex]); + } + + EventKey eventKey = new EventKey(context.schemaID, + context.dimensionsDescriptorID, + context.aggregatorID, + keys); + + return eventKey; + } + + private static final Logger LOG = LoggerFactory.getLogger(AbstractIncrementalAggregator.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregateEvent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregateEvent.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregateEvent.java new file mode 100644 index 0000000..6000c90 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregateEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import it.unimi.dsi.fastutil.Hash; + +/** + * @since 3.3.0 + */ +public interface AggregateEvent +{ + int getAggregatorIndex(); + + public static interface Aggregator<EVENT, AGGREGATE extends AggregateEvent> extends Hash.Strategy<EVENT> + { + AGGREGATE getGroup(EVENT src, int aggregatorIndex); + + void aggregate(AGGREGATE dest, EVENT src); + + void aggregate(AGGREGATE dest, AGGREGATE src); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorAverage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorAverage.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorAverage.java new file mode 100644 index 0000000..e5b6d83 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorAverage.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import java.util.List; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.schemas.Fields; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * This is the average {@link OTFAggregator}. + * + * @since 3.1.0 + */ +@Name("AVG") +public class AggregatorAverage implements OTFAggregator +{ + private static final long serialVersionUID = 20154301644L; + + /** + * The array index of the sum aggregates in the argument list of the {@link #aggregate} function. + */ + public static int SUM_INDEX = 0; + /** + * The array index of the count aggregates in the argument list of the {@link #aggregate} function. + */ + public static int COUNT_INDEX = 1; + /** + * The singleton instance of this class. + */ + public static final AggregatorAverage INSTANCE = new AggregatorAverage(); + + /** + * The list of {@link IncrementalAggregator}s that this {@link OTFAggregator} depends on. + */ + public static final transient List<Class<? extends IncrementalAggregator>> CHILD_AGGREGATORS = + ImmutableList.of(AggregatorIncrementalType.SUM.getAggregator().getClass(), + AggregatorIncrementalType.COUNT.getAggregator().getClass()); + + /** + * Constructor for singleton pattern. + */ + protected AggregatorAverage() + { + //Do nothing + } + + @Override + public List<Class<? extends IncrementalAggregator>> getChildAggregators() + { + return CHILD_AGGREGATORS; + } + + @Override + public GPOMutable aggregate(GPOMutable... aggregates) + { + Preconditions.checkArgument(aggregates.length == getChildAggregators().size(), + "The number of arguments " + aggregates.length + + " should be the same as the number of child aggregators " + getChildAggregators().size()); + + GPOMutable sumAggregation = aggregates[SUM_INDEX]; + GPOMutable countAggregation = aggregates[COUNT_INDEX]; + + FieldsDescriptor fieldsDescriptor = sumAggregation.getFieldDescriptor(); + Fields fields = fieldsDescriptor.getFields(); + GPOMutable result = new GPOMutable(AggregatorUtils.getOutputFieldsDescriptor(fields, this)); + + long count = countAggregation.getFieldsLong()[0]; + + for (String field : fields.getFields()) { + Type type = sumAggregation.getFieldDescriptor().getType(field); + + switch (type) { + case BYTE: { + double val = ((double)sumAggregation.getFieldByte(field)) / + ((double)count); + result.setField(field, val); + break; + } + case SHORT: { + double val = ((double)sumAggregation.getFieldShort(field)) / + ((double)count); + result.setField(field, val); + break; + } + case INTEGER: { + double val = ((double)sumAggregation.getFieldInt(field)) / + ((double)count); + result.setField(field, val); + break; + } + case LONG: { + double val = ((double)sumAggregation.getFieldLong(field)) / + ((double)count); + result.setField(field, val); + break; + } + case FLOAT: { + double val = sumAggregation.getFieldFloat(field) / + ((double)count); + result.setField(field, val); + break; + } + case DOUBLE: { + double val = sumAggregation.getFieldDouble(field) / + ((double)count); + result.setField(field, val); + break; + } + default: { + throw new UnsupportedOperationException("The type " + type + " is not supported."); + } + } + } + + return result; + } + + @Override + public Type getOutputType() + { + return Type.DOUBLE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCount.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCount.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCount.java new file mode 100644 index 0000000..61d12a8 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCount.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import java.util.Collections; +import java.util.Map; + +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.EventKey; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * This {@link IncrementalAggregator} performs a count of the number of times an input is encountered. + * + * @since 3.1.0 + */ +@Name("COUNT") +public class AggregatorCount extends AbstractIncrementalAggregator +{ + private static final long serialVersionUID = 20154301645L; + + /** + * This is a map whose keys represent input types and whose values + * represent the corresponding output types. + */ + public static final transient Map<Type, Type> TYPE_CONVERSION_MAP; + + static { + Map<Type, Type> typeConversionMap = Maps.newHashMap(); + + for (Type type : Type.values()) { + typeConversionMap.put(type, Type.LONG); + } + + TYPE_CONVERSION_MAP = Collections.unmodifiableMap(typeConversionMap); + } + + public AggregatorCount() + { + //Do nothing + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + src.used = true; + GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor); + GPOMutable keys = new GPOMutable(context.keyDescriptor); + GPOUtils.indirectCopy(keys, src.getKeys(), context.indexSubsetKeys); + + EventKey eventKey = createEventKey(src, + context, + aggregatorIndex); + + long[] longFields = aggregates.getFieldsLong(); + + for (int index = 0; + index < longFields.length; + index++) { + longFields[index] = 0; + } + + return new Aggregate(eventKey, + aggregates); + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + long[] fieldsLong = dest.getAggregates().getFieldsLong(); + + for (int index = 0; + index < fieldsLong.length; + index++) { + //increment count + fieldsLong[index]++; + } + } + + @Override + public void aggregate(Aggregate destAgg, Aggregate srcAgg) + { + long[] destLongs = destAgg.getAggregates().getFieldsLong(); + long[] srcLongs = srcAgg.getAggregates().getFieldsLong(); + + for (int index = 0; + index < destLongs.length; + index++) { + //aggregate count + destLongs[index] += srcLongs[index]; + } + } + + @Override + public Type getOutputType(Type inputType) + { + return TYPE_CONVERSION_MAP.get(inputType); + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCumSum.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCumSum.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCumSum.java new file mode 100644 index 0000000..744cd1c --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCumSum.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import java.util.List; +import java.util.Map; + +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.gpo.Serde; +import com.datatorrent.lib.appdata.gpo.SerdeFieldsDescriptor; +import com.datatorrent.lib.appdata.gpo.SerdeListGPOMutable; +import com.datatorrent.lib.appdata.gpo.SerdeObjectPayloadFix; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; + +@Name("CUM_SUM") +/** + * @since 3.1.0 + */ + +public class AggregatorCumSum extends AggregatorSum +{ + private static final long serialVersionUID = 201506280518L; + + public static final int KEY_FD_INDEX = 0; + public static final int AGGREGATE_FD_INDEX = 1; + public static final int KEYS_INDEX = 2; + public static final int AGGREGATES_INDEX = 3; + + public static final FieldsDescriptor META_DATA_FIELDS_DESCRIPTOR; + + static { + Map<String, Type> fieldToType = Maps.newHashMap(); + fieldToType.put("fdkeys", Type.OBJECT); + fieldToType.put("fdvalues", Type.OBJECT); + fieldToType.put("keys", Type.OBJECT); + fieldToType.put("values", Type.OBJECT); + + Map<String, Serde> fieldToSerde = Maps.newHashMap(); + fieldToSerde.put("fdkeys", SerdeFieldsDescriptor.INSTANCE); + fieldToSerde.put("fdvalues", SerdeFieldsDescriptor.INSTANCE); + fieldToSerde.put("keys", SerdeListGPOMutable.INSTANCE); + fieldToSerde.put("values", SerdeListGPOMutable.INSTANCE); + + META_DATA_FIELDS_DESCRIPTOR = new FieldsDescriptor(fieldToType, + fieldToSerde, + new PayloadFix()); + } + + public AggregatorCumSum() + { + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + src.used = true; + Aggregate agg = createAggregate(src, + context, + aggregatorIndex); + + GPOUtils.indirectCopy(agg.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); + + GPOMutable metaData = new GPOMutable(getMetaDataDescriptor()); + + GPOMutable fullKey = new GPOMutable(src.getKeys()); + + if (context.inputTimestampIndex >= 0) { + fullKey.getFieldsLong()[context.inputTimestampIndex] = -1L; + } + + List<GPOMutable> keys = Lists.newArrayList(fullKey); + + GPOMutable value = new GPOMutable(agg.getAggregates()); + List<GPOMutable> values = Lists.newArrayList(value); + + metaData.getFieldsObject()[KEY_FD_INDEX] = fullKey.getFieldDescriptor(); + metaData.getFieldsObject()[AGGREGATE_FD_INDEX] = value.getFieldDescriptor(); + metaData.getFieldsObject()[KEYS_INDEX] = keys; + metaData.getFieldsObject()[AGGREGATES_INDEX] = values; + agg.setMetaData(metaData); + + return agg; + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + @SuppressWarnings("unchecked") + List<GPOMutable> destKeys = + (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX]; + + @SuppressWarnings("unchecked") + List<GPOMutable> destAggregates = + (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX]; + + long timestamp = 0L; + + if (context.inputTimestampIndex >= 0) { + timestamp = src.getKeys().getFieldsLong()[context.inputTimestampIndex]; + src.getKeys().getFieldsLong()[context.inputTimestampIndex] = -1L; + } + + if (!contains(destKeys, src.getKeys())) { + destKeys.add(new GPOMutable(src.getKeys())); + + GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor); + GPOUtils.indirectCopy(aggregates, src.getAggregates(), context.indexSubsetAggregates); + + destAggregates.add(aggregates); + + this.aggregateAggs(dest.getAggregates(), aggregates); + } + + if (context.inputTimestampIndex >= 0) { + src.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp; + } + } + + @Override + public void aggregate(Aggregate dest, Aggregate src) + { + dest.getMetaData().applyObjectPayloadFix(); + src.getMetaData().applyObjectPayloadFix(); + + @SuppressWarnings("unchecked") + List<GPOMutable> destKeys = + (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX]; + + @SuppressWarnings("unchecked") + List<GPOMutable> srcKeys = + (List<GPOMutable>)src.getMetaData().getFieldsObject()[KEYS_INDEX]; + + @SuppressWarnings("unchecked") + List<GPOMutable> destAggregates = + (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX]; + + @SuppressWarnings("unchecked") + List<GPOMutable> srcAggregates = + (List<GPOMutable>)src.getMetaData().getFieldsObject()[AGGREGATES_INDEX]; + + List<GPOMutable> newKeys = Lists.newArrayList(); + List<GPOMutable> newAggs = Lists.newArrayList(); + + for (int index = 0; + index < srcKeys.size(); + index++) { + GPOMutable currentSrcKey = srcKeys.get(index); + GPOMutable currentSrcAgg = srcAggregates.get(index); + + if (!contains(destKeys, currentSrcKey)) { + newKeys.add(currentSrcKey); + newAggs.add(currentSrcAgg); + + this.aggregateAggs(dest.getAggregates(), currentSrcAgg); + } + } + + destKeys.addAll(newKeys); + destAggregates.addAll(newAggs); + } + + private boolean contains(List<GPOMutable> mutables, GPOMutable mutable) + { + for (int index = 0; + index < mutables.size(); + index++) { + GPOMutable mutableFromList = mutables.get(index); + + if (GPOUtils.equals(mutableFromList, mutable)) { + return true; + } + } + + return false; + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return META_DATA_FIELDS_DESCRIPTOR; + } + + public static class PayloadFix implements SerdeObjectPayloadFix + { + @Override + public void fix(Object[] objects) + { + FieldsDescriptor keyfd = (FieldsDescriptor)objects[KEY_FD_INDEX]; + FieldsDescriptor valuefd = (FieldsDescriptor)objects[AGGREGATE_FD_INDEX]; + + @SuppressWarnings("unchecked") + List<GPOMutable> keyMutables = (List<GPOMutable>)objects[KEYS_INDEX]; + @SuppressWarnings("unchecked") + List<GPOMutable> aggregateMutables = (List<GPOMutable>)objects[AGGREGATES_INDEX]; + + fix(keyfd, keyMutables); + fix(valuefd, aggregateMutables); + } + + private void fix(FieldsDescriptor fd, List<GPOMutable> mutables) + { + for (int index = 0; + index < mutables.size(); + index++) { + mutables.get(index).setFieldDescriptor(fd); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorFirst.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorFirst.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorFirst.java new file mode 100644 index 0000000..a6ceb65 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorFirst.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * <p> + * This aggregator creates an aggregate out of the first {@link InputEvent} encountered by this aggregator. All + * subsequent + * {@link InputEvent}s are ignored. + * </p> + * <p> + * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so + * one is picked arbitrarily to be the first. + * </p> + * + * @since 3.1.0 + */ +@Name("FIRST") +public class AggregatorFirst extends AbstractIncrementalAggregator +{ + private static final long serialVersionUID = 20154301646L; + + public AggregatorFirst() + { + //Do nothing + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + Aggregate aggregate = super.getGroup(src, aggregatorIndex); + + GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); + + return aggregate; + } + + @Override + public Type getOutputType(Type inputType) + { + return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType); + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + //Ignore + } + + @Override + public void aggregate(Aggregate dest, Aggregate src) + { + //Ignore + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorIncrementalType.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorIncrementalType.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorIncrementalType.java new file mode 100644 index 0000000..c82e6ee --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorIncrementalType.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import java.util.Collections; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +/** + * @since 3.1.0 + */ + +public enum AggregatorIncrementalType +{ + SUM(new AggregatorSum()), + MIN(new AggregatorMin()), + MAX(new AggregatorMax()), + COUNT(new AggregatorCount()), + LAST(new AggregatorLast()), + FIRST(new AggregatorFirst()), + CUM_SUM(new AggregatorCumSum()); + + public static final Map<String, Integer> NAME_TO_ORDINAL; + public static final Map<String, IncrementalAggregator> NAME_TO_AGGREGATOR; + + private IncrementalAggregator aggregator; + + static { + Map<String, Integer> nameToOrdinal = Maps.newHashMap(); + Map<String, IncrementalAggregator> nameToAggregator = Maps.newHashMap(); + + for (AggregatorIncrementalType aggType : AggregatorIncrementalType.values()) { + nameToOrdinal.put(aggType.name(), aggType.ordinal()); + nameToAggregator.put(aggType.name(), aggType.getAggregator()); + } + + NAME_TO_ORDINAL = Collections.unmodifiableMap(nameToOrdinal); + NAME_TO_AGGREGATOR = Collections.unmodifiableMap(nameToAggregator); + } + + AggregatorIncrementalType(IncrementalAggregator aggregator) + { + setAggregator(aggregator); + } + + private void setAggregator(IncrementalAggregator aggregator) + { + Preconditions.checkNotNull(aggregator); + this.aggregator = aggregator; + } + + public IncrementalAggregator getAggregator() + { + return aggregator; + } + + private static final Logger LOG = LoggerFactory.getLogger(AggregatorIncrementalType.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorLast.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorLast.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorLast.java new file mode 100644 index 0000000..b679098 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorLast.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * <p> + * This aggregator creates an aggregate out of the last {@link InputEvent} encountered by this aggregator. All previous + * {@link InputEvent}s are ignored. + * </p> + * <p> + * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so + * one is picked arbitrarily to be the last. + * </p> + * + * @since 3.1.0 + */ +@Name("LAST") +public class AggregatorLast extends AbstractIncrementalAggregator +{ + private static final long serialVersionUID = 20154301647L; + + public AggregatorLast() + { + //Do nothing + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + Aggregate aggregate = super.getGroup(src, aggregatorIndex); + + GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); + + return aggregate; + } + + @Override + public Type getOutputType(Type inputType) + { + return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType); + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + GPOUtils.indirectCopy(dest.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); + } + + @Override + public void aggregate(Aggregate dest, Aggregate src) + { + DimensionsEvent.copy(dest, src); + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMax.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMax.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMax.java new file mode 100644 index 0000000..8aca886 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMax.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * This {@link IncrementalAggregator} takes the max of the fields provided in the {@link InputEvent}. + * + * @since 3.1.0 + */ +@Name("MAX") +public class AggregatorMax extends AbstractIncrementalAggregator +{ + private static final long serialVersionUID = 201503120332L; + + public AggregatorMax() + { + //Do nothing + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + Aggregate aggregate = super.getGroup(src, aggregatorIndex); + + GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); + + return aggregate; + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + GPOMutable destAggs = dest.getAggregates(); + GPOMutable srcAggs = src.getAggregates(); + + { + byte[] destByte = destAggs.getFieldsByte(); + if (destByte != null) { + byte[] srcByte = srcAggs.getFieldsByte(); + int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset; + for (int index = 0; + index < destByte.length; + index++) { + byte tempVal = srcByte[srcIndices[index]]; + if (destByte[index] < tempVal) { + destByte[index] = tempVal; + } + } + } + } + + { + short[] destShort = destAggs.getFieldsShort(); + if (destShort != null) { + short[] srcShort = srcAggs.getFieldsShort(); + int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset; + for (int index = 0; + index < destShort.length; + index++) { + short tempVal = srcShort[srcIndices[index]]; + if (destShort[index] < tempVal) { + destShort[index] = tempVal; + } + } + } + } + + { + int[] destInteger = destAggs.getFieldsInteger(); + if (destInteger != null) { + int[] srcInteger = srcAggs.getFieldsInteger(); + int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset; + for (int index = 0; + index < destInteger.length; + index++) { + int tempVal = srcInteger[srcIndices[index]]; + if (destInteger[index] < tempVal) { + destInteger[index] = tempVal; + } + } + } + } + + { + long[] destLong = destAggs.getFieldsLong(); + if (destLong != null) { + long[] srcLong = srcAggs.getFieldsLong(); + int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset; + for (int index = 0; + index < destLong.length; + index++) { + long tempVal = srcLong[srcIndices[index]]; + if (destLong[index] < tempVal) { + destLong[index] = tempVal; + } + } + } + } + + { + float[] destFloat = destAggs.getFieldsFloat(); + if (destFloat != null) { + float[] srcFloat = srcAggs.getFieldsFloat(); + int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset; + for (int index = 0; + index < destFloat.length; + index++) { + float tempVal = srcFloat[srcIndices[index]]; + if (destFloat[index] < tempVal) { + destFloat[index] = tempVal; + } + } + } + } + + { + double[] destDouble = destAggs.getFieldsDouble(); + if (destDouble != null) { + double[] srcDouble = srcAggs.getFieldsDouble(); + int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset; + for (int index = 0; + index < destDouble.length; + index++) { + double tempVal = srcDouble[srcIndices[index]]; + if (destDouble[index] < tempVal) { + destDouble[index] = tempVal; + } + } + } + } + } + + @Override + public void aggregate(Aggregate dest, Aggregate src) + { + GPOMutable destAggs = dest.getAggregates(); + GPOMutable srcAggs = src.getAggregates(); + + { + byte[] destByte = destAggs.getFieldsByte(); + if (destByte != null) { + byte[] srcByte = srcAggs.getFieldsByte(); + + for (int index = 0; + index < destByte.length; + index++) { + if (destByte[index] < srcByte[index]) { + destByte[index] = srcByte[index]; + } + } + } + } + + { + short[] destShort = destAggs.getFieldsShort(); + if (destShort != null) { + short[] srcShort = srcAggs.getFieldsShort(); + + for (int index = 0; + index < destShort.length; + index++) { + if (destShort[index] < srcShort[index]) { + destShort[index] = srcShort[index]; + } + } + } + } + + { + int[] destInteger = destAggs.getFieldsInteger(); + if (destInteger != null) { + int[] srcInteger = srcAggs.getFieldsInteger(); + + for (int index = 0; + index < destInteger.length; + index++) { + if (destInteger[index] < srcInteger[index]) { + destInteger[index] = srcInteger[index]; + } + } + } + } + + { + long[] destLong = destAggs.getFieldsLong(); + if (destLong != null) { + long[] srcLong = srcAggs.getFieldsLong(); + + for (int index = 0; + index < destLong.length; + index++) { + if (destLong[index] < srcLong[index]) { + destLong[index] = srcLong[index]; + } + } + } + } + + { + float[] destFloat = destAggs.getFieldsFloat(); + if (destFloat != null) { + float[] srcFloat = srcAggs.getFieldsFloat(); + + for (int index = 0; + index < destFloat.length; + index++) { + if (destFloat[index] < srcFloat[index]) { + destFloat[index] = srcFloat[index]; + } + } + } + } + + { + double[] destDouble = destAggs.getFieldsDouble(); + if (destDouble != null) { + double[] srcDouble = srcAggs.getFieldsDouble(); + + for (int index = 0; + index < destDouble.length; + index++) { + if (destDouble[index] < srcDouble[index]) { + destDouble[index] = srcDouble[index]; + } + } + } + } + } + + @Override + public Type getOutputType(Type inputType) + { + return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType); + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return null; + } +}
