http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java new file mode 100644 index 0000000..6639d3a --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java @@ -0,0 +1,807 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.appdata.schemas; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry; +import com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator; + +/** + * The {@link DimensionalSchema} class represents the App Data dimensions schema. The App Data dimensions + * schema is built from two sources: a {@link DimensionalConfigurationSchema} and an optional schema stub. The + * {@link DimensionalConfigurationSchema} is responsible for defining the key, values, dimensions combinations, + * and the aggregations performed for each dimensions combination. The schema stub defines the from and to + * times for the App Data dimensions schema. For details on how to define the {@link DimensionalConfigurationSchema} + * schema please the documentation for the {@link DimensionalConfigurationSchema} class. An example of a valid + * schema stub which defines the from and to times is below: + * <br/> + * <br/> + * {@code + * { + * "time": + * { + * "from":1123455556656, + * "to":382390859384 + * } + * } + * + * @since 3.1.0 + */ +public class DimensionalSchema implements Schema +{ + /** + * The type of the schema. + */ + public static final String SCHEMA_TYPE = "dimensions"; + /** + * The version of the schema. + */ + public static final String SCHEMA_VERSION = "1.0"; + /** + * The JSON key string corresponding to the from field. + */ + public static final String FIELD_TIME_FROM = "from"; + /** + * The JSON key string corresponding to the time field. + */ + public static final String FIELD_TIME = "time"; + /** + * The JSON key string corresponding to the to field. + */ + public static final String FIELD_TIME_TO = "to"; + /** + * The JSON key string corresponding to the buckets field. + */ + public static final String FIELD_TIME_BUCKETS = "buckets"; + /** + * The JSON key string corresponding to the slidingAggregateSupported field. + */ + public static final String FIELD_SLIDING_AGGREGATE_SUPPORTED = "slidingAggregateSupported"; + /** + * The JSON key string used to identify the tags. + */ + //TODO To be removed when Malhar Library 3.3 becomes a dependency. + private static final String FIELD_TAGS = "tags"; + + public static final List<Fields> VALID_KEYS = ImmutableList.of(new Fields(Sets.newHashSet(FIELD_TIME))); + public static final List<Fields> VALID_TIME_KEYS = ImmutableList.of( + new Fields(Sets.newHashSet(FIELD_TIME_FROM, FIELD_TIME_TO))); + + /** + * The from value for the schema. Null if there is no from value. + */ + private Long from; + /** + * The to value for the schema. Null if there is no to value. + */ + private Long to; + /** + * boolean flag indicating if any values in the schema have been changed. + */ + private boolean changed = false; + /** + * boolean flag indicating if the from to fields in the schema have been changed. + */ + private boolean changedFromTo = false; + /** + * boolean flag indicating if the schema keys have been updated for the schema. + */ + private boolean changedSchemaKeys = false; + /** + * boolean flag indicating if the enum vals are updated. + */ + private boolean areEnumsUpdated = false; + /** + * The AppData schema JSON string (which is returned in the schema query). + */ + private String schemaJSON; + /** + * The {@link DimensionalConfigurationSchema} from which this {@link DimensionalSchema} was constructed. + */ + private DimensionalConfigurationSchema configurationSchema; + /** + * The {@link JSONObject} representing the AppData dimensions schema. + */ + private JSONObject schema; + /** + * The {@link JSONObject} representing the time section of the AppData dimensions schema. + */ + private JSONObject time; + /** + * The {@link JSONObject} representing the keys section of the AppData dimensions schema. + */ + private JSONArray keys; + /** + * This flag is true if there was a from and to time defined for this schema initially. + */ + private boolean predefinedFromTo = false; + /** + * The schema keys for this schema. + */ + private Map<String, String> schemaKeys; + /** + * The current enum vals for this schema. + */ + private Map<String, List<Object>> currentEnumVals; + /** + * The schemaID assigned to this schema. This schemaID is only needed for operators + * which need to host multiple schemas. + */ + private int schemaID = Schema.DEFAULT_SCHEMA_ID; + + /** + * Constructor for serialization + */ + private DimensionalSchema() + { + //For kryo + } + + /** + * This creates a {@link DimensionalSchema} object from the given schema stub, + * configuration schema, and schema keys. + * + * @param schemaStub The schema stub to use when creating this {@link DimensionalSchema}. + * @param configurationSchema The configuration schema to use when creating this {@link DimensionalSchema}. + * @param schemaKeys The schemaKeys to use when creating this {@link DimensionalSchema}. + */ + public DimensionalSchema(String schemaStub, + DimensionalConfigurationSchema configurationSchema, + Map<String, String> schemaKeys) + { + this(configurationSchema, + schemaKeys); + + if (schemaStub != null) { + predefinedFromTo = true; + try { + setSchemaStub(schemaStub); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * This creates a {@link DimensionalSchema} object from the given schemaID, schemaStrub,configurationSchema, and + * schemaKeys. + * + * @param schemaID The schemaID assigned to this schema. + * @param schemaStub The schema stub to use when creating this {@link DimensionalSchema}. + * @param configurationSchema The configuration schema to use when creating this {@link DimensionalSchema}. + * @param schemaKeys The schemaKeys to use when creating this {@link DimensionalSchema}. + */ + public DimensionalSchema(int schemaID, + String schemaStub, + DimensionalConfigurationSchema configurationSchema, + Map<String, String> schemaKeys) + { + this(schemaStub, + configurationSchema, + schemaKeys); + + this.schemaID = schemaID; + } + + /** + * This creates a {@link DimensionalSchema} from the given schemaStub and configuration schema. + * + * @param schemaStub The schema stub to use when creating this {@link DimensionalSchema}. + * @param configurationSchema The configuration schema to use when creating this {@link DimensionalSchema}. + */ + public DimensionalSchema(String schemaStub, + DimensionalConfigurationSchema configurationSchema) + { + this(schemaStub, configurationSchema, null); + } + + /** + * This creates a {@link DimensionalSchema} from the given schemaID, schemaStub, and + * configurationSchema. + * + * @param schemaID The schemaID assigned to this schema. + * @param schemaStub The schema stub to use when creating this {@link DimensionalSchema}. + * @param configurationSchema The configuration schema to use when creating this {@link DimensionalSchema}. + */ + public DimensionalSchema(int schemaID, + String schemaStub, + DimensionalConfigurationSchema configurationSchema) + { + this(schemaStub, + configurationSchema); + + this.schemaID = schemaID; + } + + /** + * Creates a {@link DimensionalSchema} from the given configuration schema and schema keys. + * + * @param configurationSchema The configuration schema from which to construct this {@link DimensionalEventSchema}. + * @param schemaKeys The schemaKeys assigned to this schema. + */ + public DimensionalSchema(DimensionalConfigurationSchema configurationSchema, + Map<String, String> schemaKeys) + { + setConfigurationSchema(configurationSchema); + setSchemaKeys(schemaKeys); + + try { + initialize(); + } catch (JSONException e) { + throw new RuntimeException(e); + } + } + + /** + * Creates a {@link DimensionalSchema} object from the given schemaID, configurationSchema, + * and schemaKeys. + * + * @param schemaID The schemaID assigned to this schema. + * @param configurationSchema The configuration schema from which this schema was constructed. + * @param schemaKeys The schema keys assigned to this schema. + */ + public DimensionalSchema(int schemaID, + DimensionalConfigurationSchema configurationSchema, + Map<String, String> schemaKeys) + { + this(configurationSchema, + schemaKeys); + + this.schemaID = schemaID; + } + + /** + * Creates a {@link DimensionalSchema} object from the given configuration schema. + * + * @param configurationSchema The configuration schema from which to construct this + * schema. + */ + public DimensionalSchema(DimensionalConfigurationSchema configurationSchema) + { + this(configurationSchema, + null); + } + + /** + * Creates a {@link DimensionalSchema} object with the given schema ID and + * configuration schema. + * + * @param schemaID The schemaID assigned to this schema. + * @param configurationSchema The configuration schema from which this schema as constructed. + */ + public DimensionalSchema(int schemaID, + DimensionalConfigurationSchema configurationSchema) + { + this(configurationSchema); + this.schemaID = schemaID; + } + + /** + * Returns the aggregator registry assigned to this schema object. + * + * @return The aggregator registry. + */ + public AggregatorRegistry getAggregatorRegistry() + { + return configurationSchema.getAggregatorRegistry(); + } + + @Override + public final void setSchemaKeys(Map<String, String> schemaKeys) + { + changed = true; + changedSchemaKeys = true; + + if (schemaKeys == null) { + this.schemaKeys = null; + return; + } + + for (Map.Entry<String, String> entry : schemaKeys.entrySet()) { + Preconditions.checkNotNull(entry.getKey()); + Preconditions.checkNotNull(entry.getValue()); + } + + this.schemaKeys = Maps.newHashMap(schemaKeys); + } + + /** + * This is a helper method for setting the configuration schema. + * + * @param configurationSchema The configuration schema. + */ + private void setConfigurationSchema(DimensionalConfigurationSchema configurationSchema) + { + this.configurationSchema = Preconditions.checkNotNull(configurationSchema, "eventSchema"); + } + + /** + * This is a helper method extracts and validates the information contained in the schema stub for this schema. + * + * @param schemaStub The schema stub to extract information from and validate. + * @throws JSONException This exception is thrown if there is an error processing the provided JSON schemaStub. + */ + private void setSchemaStub(String schemaStub) throws JSONException + { + JSONObject jo = new JSONObject(schemaStub); + SchemaUtils.checkValidKeysEx(jo, VALID_KEYS); + + JSONObject tempTime = jo.getJSONObject(FIELD_TIME); + SchemaUtils.checkValidKeys(jo, VALID_TIME_KEYS); + + this.from = tempTime.getLong(FIELD_TIME_FROM); + this.to = tempTime.getLong(FIELD_TIME_TO); + } + + /** + * Initializes the schema JSON and schema metadata. + * + * @throws JSONException This exception is thrown when there is an + * exception building the schema for the AppData dimensions schema. + */ + private void initialize() throws JSONException + { + schema = new JSONObject(); + + if (schemaKeys != null) { + schema.put(Schema.FIELD_SCHEMA_KEYS, + SchemaUtils.createJSONObject(schemaKeys)); + } + + schema.put(SnapshotSchema.FIELD_SCHEMA_TYPE, DimensionalSchema.SCHEMA_TYPE); + schema.put(SnapshotSchema.FIELD_SCHEMA_VERSION, DimensionalSchema.SCHEMA_VERSION); + + if (!configurationSchema.getTags().isEmpty()) { + schema.put(FIELD_TAGS, new JSONArray(configurationSchema.getTags())); + } + + //time + time = new JSONObject(); + schema.put(FIELD_TIME, time); + JSONArray bucketsArray = new JSONArray(configurationSchema.getBucketsString()); + time.put(FIELD_TIME_BUCKETS, bucketsArray); + time.put(FIELD_SLIDING_AGGREGATE_SUPPORTED, true); + + //keys + keys = new JSONArray(configurationSchema.getKeysString()); + + for (int keyIndex = 0; keyIndex < keys.length(); keyIndex++) { + JSONObject keyJo = keys.getJSONObject(keyIndex); + String keyName = keyJo.getString(DimensionalConfigurationSchema.FIELD_KEYS_NAME); + List<String> tags = configurationSchema.getKeyToTags().get(keyName); + + if (!tags.isEmpty()) { + keyJo.put(FIELD_TAGS, new JSONArray(tags)); + } + } + + schema.put(DimensionalConfigurationSchema.FIELD_KEYS, keys); + + //values + JSONArray values = new JSONArray(); + schema.put(SnapshotSchema.FIELD_VALUES, values); + + FieldsDescriptor inputValuesDescriptor = configurationSchema.getInputValuesDescriptor(); + Map<String, Map<String, Type>> allValueToAggregator = configurationSchema.getSchemaAllValueToAggregatorToType(); + + for (Map.Entry<String, Map<String, Type>> entry : allValueToAggregator.entrySet()) { + String valueName = entry.getKey(); + + for (Map.Entry<String, Type> entryAggType : entry.getValue().entrySet()) { + String aggregatorName = entryAggType.getKey(); + Type outputValueType = entryAggType.getValue(); + + JSONObject value = new JSONObject(); + String combinedName = valueName + + DimensionalConfigurationSchema.ADDITIONAL_VALUE_SEPERATOR + + aggregatorName; + value.put(SnapshotSchema.FIELD_VALUES_NAME, combinedName); + value.put(SnapshotSchema.FIELD_VALUES_TYPE, outputValueType.getName()); + + List<String> tags = configurationSchema.getValueToTags().get(valueName); + + if (!tags.isEmpty()) { + value.put(FIELD_TAGS, new JSONArray(tags)); + } + + values.put(value); + } + } + + JSONArray dimensions = new JSONArray(); + + for (int combinationID = 0; + combinationID < configurationSchema.getDimensionsDescriptorIDToKeys().size(); + combinationID++) { + + Fields fields = configurationSchema.getDimensionsDescriptorIDToKeys().get(combinationID); + Map<String, Set<String>> fieldToAggregatorAdditionalValues = + configurationSchema.getDimensionsDescriptorIDToFieldToAggregatorAdditionalValues().get(combinationID); + + JSONObject combination = new JSONObject(); + JSONArray combinationArray = new JSONArray(); + + for (String field : fields.getFields()) { + combinationArray.put(field); + } + + combination.put(DimensionalConfigurationSchema.FIELD_DIMENSIONS_COMBINATIONS, combinationArray); + + if (!fieldToAggregatorAdditionalValues.isEmpty()) { + JSONArray additionalValueArray = new JSONArray(); + + for (Map.Entry<String, Set<String>> entry : fieldToAggregatorAdditionalValues.entrySet()) { + String valueName = entry.getKey(); + + for (String aggregatorName : entry.getValue()) { + JSONObject additionalValueObject = new JSONObject(); + String combinedName = valueName + + DimensionalConfigurationSchema.ADDITIONAL_VALUE_SEPERATOR + + aggregatorName; + Type inputValueType = inputValuesDescriptor.getType(valueName); + + if (!configurationSchema.getAggregatorRegistry().isAggregator(aggregatorName)) { + if (aggregatorName == null) { + LOG.error("{} is not a valid aggregator.", aggregatorName); + } + } + + Type outputValueType; + + if (configurationSchema.getAggregatorRegistry().isIncrementalAggregator(aggregatorName)) { + IncrementalAggregator aggregator + = configurationSchema.getAggregatorRegistry().getNameToIncrementalAggregator().get(aggregatorName); + + outputValueType = aggregator.getOutputType(inputValueType); + } else { + outputValueType = configurationSchema.getAggregatorRegistry().getNameToOTFAggregators().get( + aggregatorName).getOutputType(); + } + + additionalValueObject.put(DimensionalConfigurationSchema.FIELD_VALUES_NAME, combinedName); + additionalValueObject.put(DimensionalConfigurationSchema.FIELD_VALUES_TYPE, outputValueType.getName()); + additionalValueArray.put(additionalValueObject); + } + } + + combination.put(DimensionalConfigurationSchema.FIELD_DIMENSIONS_ADDITIONAL_VALUES, additionalValueArray); + } + + dimensions.put(combination); + } + + schema.put(DimensionalConfigurationSchema.FIELD_DIMENSIONS, dimensions); + + this.schemaJSON = this.schema.toString(); + } + + /** + * Sets the from time for the schema. + * + * @param from The from time for the schema. + */ + public void setFrom(Long from) + { + this.from = from; + changed = true; + changedFromTo = true; + } + + /** + * Sets the to time for the schema. + * + * @param to The to time for the schema. + */ + public void setTo(Long to) + { + this.to = to; + changed = true; + changedFromTo = true; + } + + /** + * Sets the new enum lists for this schema. The sets in the provided maps are converted into lists. + * + * @param enums The new enum sets for this schema. + */ + public void setEnumsSet(Map<String, Set<Object>> enums) + { + Preconditions.checkNotNull(enums); + areEnumsUpdated = true; + + Map<String, List<Object>> enumsList = Maps.newHashMap(); + + //Check that all the given keys are valid + Preconditions.checkArgument( + configurationSchema.getKeyDescriptor().getFields().getFields().containsAll(enums.keySet()), + "The given map doesn't contain valid keys. Valid keys are %s and the provided keys are %s", + configurationSchema.getKeyDescriptor().getFields().getFields(), + enums.keySet()); + + //Todo check the type of the objects, for now just set them on the enum. + + for (Map.Entry<String, Set<Object>> entry : enums.entrySet()) { + String name = entry.getKey(); + Set<Object> vals = entry.getValue(); + + Preconditions.checkNotNull(name); + Preconditions.checkNotNull(vals); + + for (Object value : entry.getValue()) { + Preconditions.checkNotNull(value); + } + + List<Object> valsList = Lists.newArrayList(vals); + enumsList.put(name, valsList); + } + + currentEnumVals = Maps.newHashMap(enumsList); + } + + /** + * Sets the new enum lists for this schema. The sets in the provided maps are converted into lists, and + * sorted according to their natural ordering. + * + * @param enums The new enum sets for this schema. + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + public void setEnumsSetComparable(Map<String, Set<Comparable>> enums) + { + Preconditions.checkNotNull(enums); + areEnumsUpdated = true; + + Map<String, List<Object>> enumsList = Maps.newHashMap(); + + //Check that all the given keys are valid + Preconditions.checkArgument( + configurationSchema.getKeyDescriptor().getFields().getFields().containsAll(enums.keySet()), + "The given map doesn't contain valid keys. Valid keys are %s and the provided keys are %s", + configurationSchema.getKeyDescriptor().getFields().getFields(), + enums.keySet()); + + //Todo check the type of the objects, for now just set them on the enum. + for (Map.Entry<String, Set<Comparable>> entry : enums.entrySet()) { + String name = entry.getKey(); + Set<Comparable> vals = entry.getValue(); + + Preconditions.checkNotNull(name); + Preconditions.checkNotNull(vals); + + for (Object value : entry.getValue()) { + Preconditions.checkNotNull(value); + } + + List<Comparable> valsListComparable = Lists.newArrayList(vals); + Collections.sort(valsListComparable); + List<Object> valsList = (List)valsListComparable; + enumsList.put(name, valsList); + } + + currentEnumVals = Maps.newHashMap(enumsList); + } + + /** + * Sets the new enum lists for this schema. + * + * @param enums The new enum lists for this schema. + */ + public void setEnumsList(Map<String, List<Object>> enums) + { + Preconditions.checkNotNull(enums); + areEnumsUpdated = true; + + //Check that all the given keys are valid + Preconditions.checkArgument( + configurationSchema.getKeyDescriptor().getFields().getFields().containsAll(enums.keySet()), + "The given map doesn't contain valid keys. Valid keys are %s and the provided keys are %s", + configurationSchema.getKeyDescriptor().getFields().getFields(), + enums.keySet()); + + //Todo check the type of the objects, for now just set them on the enum. + for (Map.Entry<String, List<Object>> entry : enums.entrySet()) { + Preconditions.checkNotNull(entry.getKey()); + Preconditions.checkNotNull(entry.getValue()); + } + + Map<String, List<Object>> tempEnums = Maps.newHashMap(); + + for (Map.Entry<String, List<Object>> entry : enums.entrySet()) { + String key = entry.getKey(); + List<?> enumValues = entry.getValue(); + List<Object> tempEnumValues = Lists.newArrayList(); + + for (Object enumValue : enumValues) { + tempEnumValues.add(enumValue); + } + + tempEnums.put(key, tempEnumValues); + } + + currentEnumVals = tempEnums; + } + + @Override + public String getSchemaJSON() + { + if (!changed && schemaJSON != null) { + //If there are no changes, return the pre computed JSON + return schemaJSON; + } + + if (changedSchemaKeys) { + //If the schema keys change, recompute the schema keys portion of the JSON + changedSchemaKeys = false; + + if (schemaKeys == null) { + schema.remove(Schema.FIELD_SCHEMA_KEYS); + } else { + try { + schema.put(Schema.FIELD_SCHEMA_KEYS, + SchemaUtils.createJSONObject(schemaKeys)); + } catch (JSONException ex) { + throw new RuntimeException(ex); + } + } + } + + if (changedFromTo) { + //If the from to times have changed then recompute the time portion of the schema. + changedFromTo = false; + Preconditions.checkState(!(from == null ^ to == null), + "Either both from and to should be set or both should be not set."); + + if (from != null) { + Preconditions.checkState(to >= from, "to {} must be greater than or equal to from {}.", to, from); + } + + if (from == null) { + time.remove(FIELD_TIME_FROM); + time.remove(FIELD_TIME_TO); + } else { + try { + time.put(FIELD_TIME_FROM, from); + time.put(FIELD_TIME_TO, to); + } catch (JSONException ex) { + throw new RuntimeException(ex); + } + } + } + + if (this.areEnumsUpdated) { + //If the enums have been updated, recompute the key portion of the schema. + for (int keyIndex = 0; + keyIndex < keys.length(); + keyIndex++) { + JSONObject keyData; + String name; + + try { + keyData = keys.getJSONObject(keyIndex); + name = keyData.getString(DimensionalConfigurationSchema.FIELD_KEYS_NAME); + } catch (JSONException ex) { + throw new RuntimeException(ex); + } + + List<Object> enumVals = currentEnumVals.get(name); + + if (enumVals == null || enumVals.isEmpty()) { + keyData.remove(DimensionalConfigurationSchema.FIELD_KEYS_ENUMVALUES); + continue; + } + + JSONArray newEnumValues = new JSONArray(); + + for (Object enumVal : enumVals) { + newEnumValues.put(enumVal); + } + + try { + keyData.put(DimensionalConfigurationSchema.FIELD_KEYS_ENUMVALUES, newEnumValues); + } catch (JSONException ex) { + throw new RuntimeException(ex); + } + } + + this.areEnumsUpdated = false; + } + + //Rebuild the schema JSON string. + schemaJSON = schema.toString(); + return schemaJSON; + } + + /** + * Gets the {@link DimensionalConfigurationSchema} from which this {@link DimensionalSchema}. + * + * @return The {@link DimensionalConfigurationSchema} from which this {@link DimensionalSchema} was + * constructed. + */ + public DimensionalConfigurationSchema getDimensionalConfigurationSchema() + { + return configurationSchema; + } + + @Override + public String getSchemaType() + { + return SCHEMA_TYPE; + } + + @Override + public String getSchemaVersion() + { + return SCHEMA_VERSION; + } + + @Override + public Map<String, String> getSchemaKeys() + { + return schemaKeys; + } + + /** + * @return the predefinedFromTo + */ + public boolean isPredefinedFromTo() + { + return predefinedFromTo; + } + + /** + * Returns the schema ID for this schema. This is only relevant for operators which + * host multiple schemas. + * + * @return The schema ID for this schema. + */ + @Override + public int getSchemaID() + { + return schemaID; + } + + /** + * Returns the current enum vals for the schema. The current enum vals for the + * schema are expressed in a map whose keys are the names of the keys in the schema, and whose + * values are a list of possible values for the key. + * + * @return the currentEnumVals The current enum vals for the schema. + */ + public Map<String, List<Object>> getCurrentEnumVals() + { + return currentEnumVals; + } + + private static final Logger LOG = LoggerFactory.getLogger(DimensionalSchema.class); +}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java new file mode 100644 index 0000000..b12b631 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java @@ -0,0 +1,844 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions; + +import java.io.Serializable; +import java.util.List; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.dimensions.aggregator.AggregateEvent; + +/** + * <p> + * This is the base class for the events that are used for internal processing in the subclasses of + * {@link AbstractDimensionsComputationFlexible} and {@link DimensionsStoreHDHT}. + * </p> + * <p> + * A {@link DimensionsEvent} is constructed from two parts: an {@link EventKey} and a {@link GPOMutable} object + * which contains the values of aggregate fields. The {@link EventKey} is used to identify the dimension combination + * an event belongs to, and consequently determines what input values should be aggregated together. The aggregates + * are the actual data payload of the event which are to be aggregated. + * </p> + * + * @since 3.1.0 + */ +public class DimensionsEvent implements Serializable +{ + private static final long serialVersionUID = 201503231204L; + + /** + * This is the {@link GPOMutable} object which holds all the aggregates. + */ + protected GPOMutable aggregates; + /** + * This is the event key for the event. + */ + protected EventKey eventKey; + + /** + * Constructor for Kryo. + */ + private DimensionsEvent() + { + //For kryo + } + + /** + * This creates a {@link DimensionsEvent} from the given event key and aggregates. + * + * @param eventKey The key from which to create a {@link DimensionsEvent}. + * @param aggregates The aggregates from which to create {@link DimensionsEvent}. + */ + public DimensionsEvent(EventKey eventKey, + GPOMutable aggregates) + { + setEventKey(eventKey); + setAggregates(aggregates); + } + + /** + * Creates a DimensionsEvent with the given key values, aggregates and ids. + * + * @param keys The values for fields in the key. + * @param aggregates The values for fields in the aggregate. + * @param bucketID The bucketID + * @param schemaID The schemaID. + * @param dimensionDescriptorID The dimensionsDescriptorID. + * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier. + */ + public DimensionsEvent(GPOMutable keys, + GPOMutable aggregates, + int bucketID, + int schemaID, + int dimensionDescriptorID, + int aggregatorIndex) + { + this.eventKey = new EventKey(bucketID, + schemaID, + dimensionDescriptorID, + aggregatorIndex, + keys); + setAggregates(aggregates); + } + + /** + * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0. + * + * @param keys The value for fields in the key. + * @param aggregates The value for fields in the aggregate. + * @param schemaID The schemaID. + * @param dimensionDescriptorID The dimensionsDescriptorID. + * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier. + */ + public DimensionsEvent(GPOMutable keys, + GPOMutable aggregates, + int schemaID, + int dimensionDescriptorID, + int aggregatorIndex) + { + this.eventKey = new EventKey(schemaID, + dimensionDescriptorID, + aggregatorIndex, + keys); + setAggregates(aggregates); + } + + /** + * This is a helper method which sets the {@link EventKey} of the event to + * be the same as the given {@link EventKey}. + * + * @param eventKey The {@link EventKey} to set on this event. + */ + protected final void setEventKey(EventKey eventKey) + { + this.eventKey = new EventKey(eventKey); + } + + /** + * This is a helper method which sets the aggregates for this event. + * + * @param aggregates The aggregates for this event. + */ + protected final void setAggregates(GPOMutable aggregates) + { + Preconditions.checkNotNull(aggregates); + this.aggregates = aggregates; + } + + /** + * This is a helper method which returns the aggregates for this event. + * + * @return The helper method which returns the aggregates for this event. + */ + public GPOMutable getAggregates() + { + return aggregates; + } + + /** + * Returns the {@link EventKey} for this event. + * + * @return The {@link EventKey} for this event. + */ + public EventKey getEventKey() + { + return eventKey; + } + + /** + * This is a convenience method which returns the values of the key fields in this event's + * {@link EventKey}. + * + * @return The values of the key fields in this event's {@link EventKey}. + */ + public GPOMutable getKeys() + { + return eventKey.getKey(); + } + + /** + * This is a convenience method which returns the schemaID of this event's {@link EventKey}. + * + * @return The schemaID of this event's {@link EventKey}. + */ + public int getSchemaID() + { + return eventKey.getSchemaID(); + } + + /** + * Returns the id of the dimension descriptor (key combination) for which this event contains data. + * + * @return The id of the dimension descriptor (key combination) for which this event contains data. + */ + public int getDimensionDescriptorID() + { + return eventKey.getDimensionDescriptorID(); + } + + /** + * Returns the id of the aggregator which is applied to this event's data. + * + * @return Returns the id of the aggregator which is applied to this event's data. + */ + public int getAggregatorID() + { + return eventKey.getAggregatorID(); + } + + /** + * Returns the bucketID assigned to this event. The bucketID is useful for this event in the case that the event + * is sent to a partitioned HDHT operator. Each partitioned HDHT operator can use the bucketIDs for the buckets it + * writes to as a partition key. + * + * @return The bucketID assigned to this event. + */ + public int getBucketID() + { + return eventKey.getBucketID(); + } + + /** + * This is a utility method which copies the given src event to the given destination event. + * + * @param aeDest The destination event. + * @param aeSrc The source event. + */ + public static void copy(DimensionsEvent aeDest, DimensionsEvent aeSrc) + { + GPOMutable destAggs = aeDest.getAggregates(); + GPOMutable srcAggs = aeSrc.getAggregates(); + + if (srcAggs.getFieldsBoolean() != null) { + System.arraycopy(srcAggs.getFieldsBoolean(), 0, destAggs.getFieldsBoolean(), 0, + srcAggs.getFieldsBoolean().length); + } + + if (srcAggs.getFieldsCharacter() != null) { + System.arraycopy(srcAggs.getFieldsCharacter(), 0, destAggs.getFieldsCharacter(), 0, + srcAggs.getFieldsCharacter().length); + } + + if (srcAggs.getFieldsString() != null) { + System.arraycopy(srcAggs.getFieldsString(), 0, destAggs.getFieldsString(), 0, srcAggs.getFieldsString().length); + } + + if (srcAggs.getFieldsShort() != null) { + System.arraycopy(srcAggs.getFieldsShort(), 0, destAggs.getFieldsShort(), 0, srcAggs.getFieldsShort().length); + } + + if (srcAggs.getFieldsInteger() != null) { + System.arraycopy(srcAggs.getFieldsInteger(), 0, destAggs.getFieldsInteger(), 0, + srcAggs.getFieldsInteger().length); + } + + if (srcAggs.getFieldsLong() != null) { + System.arraycopy(srcAggs.getFieldsLong(), 0, destAggs.getFieldsLong(), 0, srcAggs.getFieldsLong().length); + } + + if (srcAggs.getFieldsFloat() != null) { + System.arraycopy(srcAggs.getFieldsFloat(), 0, destAggs.getFieldsFloat(), 0, srcAggs.getFieldsFloat().length); + } + + if (srcAggs.getFieldsDouble() != null) { + System.arraycopy(srcAggs.getFieldsDouble(), 0, destAggs.getFieldsDouble(), 0, srcAggs.getFieldsDouble().length); + } + } + + /** + * <p> + * The {@link EventKey} represents a dimensions combination for a dimensions event. It contains the keys and values + * which define a dimensions combination. It's very similar to a {@link DimensionsDescriptor} which is also used to + * define part of a dimensions combination. The difference between the two is that a {@link DimensionsDescriptor} + * only contains what + * keys are included in the combination, not the values of those keys (which the {@link EventKey} has. + * </p> + * <p> + * In addition to holding the keys in a dimensions combination and their values, the event key holds some meta + * information. + * The meta information included and their purposes are the following: + * <ul> + * <li><b>bucketID:</b> This is set when the dimension store responsible for storing the data is partitioned. In that + * case the bucketID is used as the partitionID.</li> + * <li><b>schemaID:</b> This is the id of the {@link DimensionalSchema} that this {@link EventKey} corresponds to + * .</li> + * <li><b>dimensionDescriptorID:</b> This is the id of the {@link DimensionsDescriptor} that this {@link EventKey} + * corresponds to.</li> + * <li><b>aggregatorID:</b> This is the id of the aggregator that is used to aggregate the values associated with + * this {@link EventKey} + * in a {@link DimensionsEvent}.</li> + * </ul> + * </p> + */ + public static class EventKey implements Serializable + { + private static final long serialVersionUID = 201503231205L; + + /** + * The bucketID assigned to this event key. + */ + private int bucketID; + /** + * The schemaID corresponding to the {@link DimensionalSchema} that this {@link EventKey} + * corresponds to. + */ + private int schemaID; + /** + * The dimensionsDescriptorID of the {@link DimensionDescriptor} in the corresponding {@link DimensionalSchema}. + */ + private int dimensionDescriptorID; + /** + * The id of the aggregator which should be used to aggregate the values corresponding to this + * {@link EventKey}. + */ + private int aggregatorID; + /** + * The values of the key fields. + */ + private GPOMutable key; + + /** + * Constructor for serialization. + */ + private EventKey() + { + //For kryo + } + + /** + * Copy constructor. + * + * @param eventKey The {@link EventKey} whose data will be copied. + */ + public EventKey(EventKey eventKey) + { + this.bucketID = eventKey.bucketID; + this.schemaID = eventKey.schemaID; + this.dimensionDescriptorID = eventKey.dimensionDescriptorID; + this.aggregatorID = eventKey.aggregatorID; + + this.key = new GPOMutable(eventKey.getKey()); + } + + /** + * Creates an event key with the given data. + * + * @param bucketID The bucketID assigned to this {@link EventKey}. + * @param schemaID The schemaID of the corresponding {@link DimensionalSchema}. + * @param dimensionDescriptorID The dimensionDescriptorID of the corresponding + * {@link DimensionDescriptor} in the {@link DimensionalSchema}. + * @param aggregatorID The id of the aggregator which should be used to aggregate the values + * corresponding to this + * {@link EventKey}. + * @param key The values of the keys. + */ + public EventKey(int bucketID, + int schemaID, + int dimensionDescriptorID, + int aggregatorID, + GPOMutable key) + { + setBucketID(bucketID); + setSchemaID(schemaID); + setDimensionDescriptorID(dimensionDescriptorID); + setAggregatorID(aggregatorID); + setKey(key); + } + + /** + * Creates an event key with the given data. This constructor assumes that the bucketID will be 0. + * + * @param schemaID The schemaID of the corresponding {@link DimensionalSchema}. + * @param dimensionDescriptorID The dimensionDescriptorID of the corresponding {@link DimensionDescriptor}. + * @param aggregatorID The id of the aggregator which should be used to aggregate the values + * corresponding to this + * {@link EventKey}. + * @param key The values of the keys. + */ + public EventKey(int schemaID, + int dimensionDescriptorID, + int aggregatorID, + GPOMutable key) + { + setSchemaID(schemaID); + setDimensionDescriptorID(dimensionDescriptorID); + setAggregatorID(aggregatorID); + setKey(key); + } + + /** + * Sets the dimension descriptor ID. + * + * @param dimensionDescriptorID The dimension descriptor ID to set. + */ + private void setDimensionDescriptorID(int dimensionDescriptorID) + { + this.dimensionDescriptorID = dimensionDescriptorID; + } + + /** + * Returns the dimension descriptor ID. + * + * @return The dimension descriptor ID. + */ + public int getDimensionDescriptorID() + { + return dimensionDescriptorID; + } + + /** + * Returns the aggregatorID. + * + * @return The aggregatorID. + */ + public int getAggregatorID() + { + return aggregatorID; + } + + /** + * Sets the aggregatorID. + * + * @param aggregatorID The aggregatorID to set. + */ + private void setAggregatorID(int aggregatorID) + { + this.aggregatorID = aggregatorID; + } + + /** + * Returns the schemaID. + * + * @return The schemaID to set. + */ + public int getSchemaID() + { + return schemaID; + } + + /** + * Sets the schemaID. + * + * @param schemaID The schemaID to set. + */ + private void setSchemaID(int schemaID) + { + this.schemaID = schemaID; + } + + /** + * Returns the key values. + * + * @return The key values. + */ + public GPOMutable getKey() + { + return key; + } + + /** + * Sets the bucektID. + * + * @param bucketID The bucketID. + */ + private void setBucketID(int bucketID) + { + this.bucketID = bucketID; + } + + /** + * Gets the bucketID. + * + * @return The bucketID. + */ + public int getBucketID() + { + return bucketID; + } + + /** + * Sets the key values. + * + * @param key The key values to set. + */ + private void setKey(GPOMutable key) + { + Preconditions.checkNotNull(key); + this.key = key; + } + + @Override + public int hashCode() + { + int hash = 3; + hash = 97 * hash + this.bucketID; + hash = 97 * hash + this.schemaID; + hash = 97 * hash + this.dimensionDescriptorID; + hash = 97 * hash + this.aggregatorID; + hash = 97 * hash + (this.key != null ? this.key.hashCode() : 0); + return hash; + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + final EventKey other = (EventKey)obj; + + if (this.bucketID != other.bucketID) { + return false; + } + + if (this.schemaID != other.schemaID) { + return false; + } + + if (this.dimensionDescriptorID != other.dimensionDescriptorID) { + return false; + } + + if (this.aggregatorID != other.aggregatorID) { + return false; + } + + if (this.key != other.key && (this.key == null || !this.key.equals(other.key))) { + return false; + } + + return true; + } + + @Override + public String toString() + { + return "EventKey{" + "schemaID=" + schemaID + ", dimensionDescriptorID=" + dimensionDescriptorID + + ", aggregatorIndex=" + aggregatorID + ", key=" + key + '}'; + } + + public static List<EventKey> createEventKeys(int schemaId, + int dimensionsDescriptorId, + int aggregatorId, + List<GPOMutable> keys) + { + List<EventKey> eventKeys = Lists.newArrayList(); + + for (GPOMutable key : keys) { + eventKeys.add(new EventKey(schemaId, dimensionsDescriptorId, aggregatorId, key)); + } + + return eventKeys; + } + } + + @Override + public int hashCode() + { + int hash = 5; + hash = 79 * hash + (this.aggregates != null ? this.aggregates.hashCode() : 0); + hash = 79 * hash + (this.eventKey != null ? this.eventKey.hashCode() : 0); + return hash; + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final DimensionsEvent other = (DimensionsEvent)obj; + if (this.aggregates != other.aggregates && (this.aggregates == null || !this.aggregates.equals(other.aggregates))) { + return false; + } + if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) { + return false; + } + return true; + } + + public static class InputEvent extends DimensionsEvent + { + private static final long serialVersionUID = 201506210406L; + public boolean used = false; + + private InputEvent() + { + } + + /** + * This creates a {@link DimensionsEvent} from the given event key and aggregates. + * + * @param eventKey The key from which to create a {@link DimensionsEvent}. + * @param aggregates The aggregates from which to create {@link DimensionsEvent}. + */ + public InputEvent(EventKey eventKey, + GPOMutable aggregates) + { + setEventKey(eventKey); + setAggregates(aggregates); + } + + /** + * Creates a DimensionsEvent with the given key values, aggregates and ids. + * + * @param keys The values for fields in the key. + * @param aggregates The values for fields in the aggregate. + * @param bucketID The bucketID + * @param schemaID The schemaID. + * @param dimensionDescriptorID The dimensionsDescriptorID. + * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier. + */ + public InputEvent(GPOMutable keys, + GPOMutable aggregates, + int bucketID, + int schemaID, + int dimensionDescriptorID, + int aggregatorIndex) + { + this.eventKey = new EventKey(bucketID, + schemaID, + dimensionDescriptorID, + aggregatorIndex, + keys); + setAggregates(aggregates); + } + + /** + * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0. + * + * @param keys The value for fields in the key. + * @param aggregates The value for fields in the aggregate. + * @param schemaID The schemaID. + * @param dimensionDescriptorID The dimensionsDescriptorID. + * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier. + */ + public InputEvent(GPOMutable keys, + GPOMutable aggregates, + int schemaID, + int dimensionDescriptorID, + int aggregatorIndex) + { + this.eventKey = new EventKey(schemaID, + dimensionDescriptorID, + aggregatorIndex, + keys); + setAggregates(aggregates); + } + + @Override + public int hashCode() + { + return GPOUtils.hashcode(this.getKeys()); + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final DimensionsEvent other = (DimensionsEvent)obj; + + if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) { + return false; + } + return true; + } + } + + public static class Aggregate extends DimensionsEvent implements AggregateEvent + { + private static final long serialVersionUID = 201506190110L; + + /** + * This is the aggregatorIndex assigned to this event. + */ + protected int aggregatorIndex; + private GPOMutable metaData; + + public Aggregate() + { + //for kryo and for extending classes + } + + /** + * This creates a {@link DimensionsEvent} from the given event key and aggregates. + * + * @param eventKey The key from which to create a {@link DimensionsEvent}. + * @param aggregates The aggregates from which to create {@link DimensionsEvent}. + */ + public Aggregate(EventKey eventKey, + GPOMutable aggregates) + { + setEventKey(eventKey); + setAggregates(aggregates); + } + + public Aggregate(EventKey eventKey, + GPOMutable aggregates, + GPOMutable metaData) + { + super(eventKey, + aggregates); + + this.metaData = metaData; + } + + /** + * Creates a DimensionsEvent with the given key values, aggregates and ids. + * + * @param keys The values for fields in the key. + * @param aggregates The values for fields in the aggregate. + * @param bucketID The bucketID + * @param schemaID The schemaID. + * @param dimensionDescriptorID The dimensionsDescriptorID. + * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier. + */ + public Aggregate(GPOMutable keys, + GPOMutable aggregates, + int bucketID, + int schemaID, + int dimensionDescriptorID, + int aggregatorIndex) + { + this.eventKey = new EventKey(bucketID, + schemaID, + dimensionDescriptorID, + aggregatorIndex, + keys); + setAggregates(aggregates); + } + + public Aggregate(GPOMutable keys, + GPOMutable aggregates, + GPOMutable metaData, + int bucketID, + int schemaID, + int dimensionDescriptorID, + int aggregatorIndex) + { + this(keys, + aggregates, + bucketID, + schemaID, + dimensionDescriptorID, + aggregatorIndex); + + this.metaData = metaData; + } + + /** + * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0. + * + * @param keys The value for fields in the key. + * @param aggregates The value for fields in the aggregate. + * @param schemaID The schemaID. + * @param dimensionDescriptorID The dimensionsDescriptorID. + * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier. + */ + public Aggregate(GPOMutable keys, + GPOMutable aggregates, + int schemaID, + int dimensionDescriptorID, + int aggregatorIndex) + { + this.eventKey = new EventKey(schemaID, + dimensionDescriptorID, + aggregatorIndex, + keys); + setAggregates(aggregates); + } + + public Aggregate(GPOMutable keys, + GPOMutable aggregates, + GPOMutable metaData, + int schemaID, + int dimensionDescriptorID, + int aggregatorIndex) + { + this(keys, + aggregates, + schemaID, + dimensionDescriptorID, + aggregatorIndex); + + this.metaData = metaData; + } + + public void setMetaData(GPOMutable metaData) + { + this.metaData = metaData; + } + + public GPOMutable getMetaData() + { + return metaData; + } + + public void setAggregatorIndex(int aggregatorIndex) + { + this.aggregatorIndex = aggregatorIndex; + } + + @Override + public int getAggregatorIndex() + { + return aggregatorIndex; + } + + @Override + public int hashCode() + { + return GPOUtils.hashcode(this.getKeys()); + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final DimensionsEvent other = (DimensionsEvent)obj; + + if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) { + return false; + } + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java new file mode 100644 index 0000000..f92bfbf --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; +import com.datatorrent.lib.dimensions.DimensionsConversionContext; +import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; +import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey; +import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; + +/** + * * <p> + * {@link IncrementalAggregator}s perform aggregations in place, on a field by field basis. For example if we have a + * field cost, an incremental aggregator would take a new value of cost and aggregate it to an aggregate value for + * cost. No fields except the cost field are used in the computation of the cost aggregation in the case of an + * {@link IncrementalAggregator}. + * </p> + * <p> + * {@link IncrementalAggregator}s are intended to be used with subclasses of + * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema}. The way in which + * {@link IncrementalAggregator}s are used in this context is that a batch of fields to be aggregated by the aggregator + * are provided in the form of an {@link InputEvent}. For example, if there are two fields (cost and revenue), which + * will be aggregated by a sum aggregator, both of those fields will be included in the {@link InputEvent} passed to + * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields, + * one for cost and one for revenue. + * </p> + * + */ +public abstract class AbstractIncrementalAggregator implements IncrementalAggregator +{ + private static final long serialVersionUID = 201506211153L; + + /** + * The conversion context for this aggregator. + */ + protected DimensionsConversionContext context; + + public AbstractIncrementalAggregator() + { + } + + @Override + public void setDimensionsConversionContext(DimensionsConversionContext context) + { + this.context = Preconditions.checkNotNull(context); + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + src.used = true; + Aggregate aggregate = createAggregate(src, + context, + aggregatorIndex); + return aggregate; + } + + @Override + public int hashCode(InputEvent inputEvent) + { + long timestamp = -1L; + boolean hasTime = this.context.inputTimestampIndex != -1 + && this.context.outputTimebucketIndex != -1; + + if (hasTime) { + timestamp = inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex]; + inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex] + = this.context.dd.getCustomTimeBucket().roundDown(timestamp); + } + + int hashCode = GPOUtils.indirectHashcode(inputEvent.getKeys(), context.indexSubsetKeys); + + if (hasTime) { + inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex] = timestamp; + } + + return hashCode; + } + + @Override + public boolean equals(InputEvent inputEvent1, InputEvent inputEvent2) + { + long timestamp1 = 0; + long timestamp2 = 0; + + if (context.inputTimestampIndex != -1) { + timestamp1 = inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex]; + inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] = + context.dd.getCustomTimeBucket().roundDown(timestamp1); + + timestamp2 = inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex]; + inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] = + context.dd.getCustomTimeBucket().roundDown(timestamp2); + } + + boolean equals = GPOUtils.subsetEquals(inputEvent2.getKeys(), + inputEvent1.getKeys(), + context.indexSubsetKeys); + + if (context.inputTimestampIndex != -1) { + inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp1; + inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp2; + } + + return equals; + } + + /** + * Creates an {@link Aggregate} from the given {@link InputEvent}. + * + * @param inputEvent The {@link InputEvent} to unpack into an {@link Aggregate}. + * @param context The conversion context required to transform the {@link InputEvent} into + * the correct {@link Aggregate}. + * @param aggregatorIndex The aggregatorIndex assigned to this {@link Aggregate}. + * @return The converted {@link Aggregate}. + */ + public static Aggregate createAggregate(InputEvent inputEvent, + DimensionsConversionContext context, + int aggregatorIndex) + { + GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor); + EventKey eventKey = createEventKey(inputEvent, + context, + aggregatorIndex); + + Aggregate aggregate = new Aggregate(eventKey, + aggregates); + aggregate.setAggregatorIndex(aggregatorIndex); + + return aggregate; + } + + /** + * Creates an {@link EventKey} from the given {@link InputEvent}. + * + * @param inputEvent The {@link InputEvent} to extract an {@link EventKey} from. + * @param context The conversion context required to extract the {@link EventKey} from + * the given {@link InputEvent}. + * @param aggregatorIndex The aggregatorIndex to assign to this {@link InputEvent}. + * @return The {@link EventKey} extracted from the given {@link InputEvent}. + */ + public static EventKey createEventKey(InputEvent inputEvent, + DimensionsConversionContext context, + int aggregatorIndex) + { + GPOMutable keys = new GPOMutable(context.keyDescriptor); + GPOUtils.indirectCopy(keys, inputEvent.getKeys(), context.indexSubsetKeys); + + if (context.outputTimebucketIndex >= 0) { + CustomTimeBucket timeBucket = context.dd.getCustomTimeBucket(); + + keys.getFieldsInteger()[context.outputTimebucketIndex] = context.customTimeBucketRegistry.getTimeBucketId( + timeBucket); + keys.getFieldsLong()[context.outputTimestampIndex] = + timeBucket.roundDown(inputEvent.getKeys().getFieldsLong()[context.inputTimestampIndex]); + } + + EventKey eventKey = new EventKey(context.schemaID, + context.dimensionsDescriptorID, + context.aggregatorID, + keys); + + return eventKey; + } + + private static final Logger LOG = LoggerFactory.getLogger(AbstractIncrementalAggregator.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java new file mode 100644 index 0000000..c15bf25 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import java.util.List; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.schemas.Fields; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * This is the average {@link OTFAggregator}. + * + * @since 3.1.0 + */ +@Name("AVG") +public class AggregatorAverage implements OTFAggregator +{ + private static final long serialVersionUID = 20154301644L; + + /** + * The array index of the sum aggregates in the argument list of the {@link #aggregate} function. + */ + public static int SUM_INDEX = 0; + /** + * The array index of the count aggregates in the argument list of the {@link #aggregate} function. + */ + public static int COUNT_INDEX = 1; + /** + * The singleton instance of this class. + */ + public static final AggregatorAverage INSTANCE = new AggregatorAverage(); + + /** + * The list of {@link IncrementalAggregator}s that this {@link OTFAggregator} depends on. + */ + public static final transient List<Class<? extends IncrementalAggregator>> CHILD_AGGREGATORS = + ImmutableList.of(AggregatorIncrementalType.SUM.getAggregator().getClass(), + AggregatorIncrementalType.COUNT.getAggregator().getClass()); + + /** + * Constructor for singleton pattern. + */ + protected AggregatorAverage() + { + //Do nothing + } + + @Override + public List<Class<? extends IncrementalAggregator>> getChildAggregators() + { + return CHILD_AGGREGATORS; + } + + @Override + public GPOMutable aggregate(GPOMutable... aggregates) + { + Preconditions.checkArgument(aggregates.length == getChildAggregators().size(), + "The number of arguments " + aggregates.length + + " should be the same as the number of child aggregators " + getChildAggregators().size()); + + GPOMutable sumAggregation = aggregates[SUM_INDEX]; + GPOMutable countAggregation = aggregates[COUNT_INDEX]; + + FieldsDescriptor fieldsDescriptor = sumAggregation.getFieldDescriptor(); + Fields fields = fieldsDescriptor.getFields(); + GPOMutable result = new GPOMutable(AggregatorUtils.getOutputFieldsDescriptor(fields, this)); + + long count = countAggregation.getFieldsLong()[0]; + + for (String field : fields.getFields()) { + Type type = sumAggregation.getFieldDescriptor().getType(field); + + switch (type) { + case BYTE: { + double val = ((double)sumAggregation.getFieldByte(field)) / + ((double)count); + result.setField(field, val); + break; + } + case SHORT: { + double val = ((double)sumAggregation.getFieldShort(field)) / + ((double)count); + result.setField(field, val); + break; + } + case INTEGER: { + double val = ((double)sumAggregation.getFieldInt(field)) / + ((double)count); + result.setField(field, val); + break; + } + case LONG: { + double val = ((double)sumAggregation.getFieldLong(field)) / + ((double)count); + result.setField(field, val); + break; + } + case FLOAT: { + double val = sumAggregation.getFieldFloat(field) / + ((double)count); + result.setField(field, val); + break; + } + case DOUBLE: { + double val = sumAggregation.getFieldDouble(field) / + ((double)count); + result.setField(field, val); + break; + } + default: { + throw new UnsupportedOperationException("The type " + type + " is not supported."); + } + } + } + + return result; + } + + @Override + public Type getOutputType() + { + return Type.DOUBLE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java new file mode 100644 index 0000000..8566e1c --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import java.util.Collections; +import java.util.Map; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; +import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; +import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey; +import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; + +/** + * This {@link IncrementalAggregator} performs a count of the number of times an input is encountered. + * + * @since 3.1.0 + */ +@Name("COUNT") +public class AggregatorCount extends AbstractIncrementalAggregator +{ + private static final long serialVersionUID = 20154301645L; + + /** + * This is a map whose keys represent input types and whose values + * represent the corresponding output types. + */ + public static final transient Map<Type, Type> TYPE_CONVERSION_MAP; + + static { + Map<Type, Type> typeConversionMap = Maps.newHashMap(); + + for (Type type : Type.values()) { + typeConversionMap.put(type, Type.LONG); + } + + TYPE_CONVERSION_MAP = Collections.unmodifiableMap(typeConversionMap); + } + + public AggregatorCount() + { + //Do nothing + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + src.used = true; + GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor); + GPOMutable keys = new GPOMutable(context.keyDescriptor); + GPOUtils.indirectCopy(keys, src.getKeys(), context.indexSubsetKeys); + + EventKey eventKey = createEventKey(src, + context, + aggregatorIndex); + + long[] longFields = aggregates.getFieldsLong(); + + for (int index = 0; + index < longFields.length; + index++) { + longFields[index] = 0; + } + + return new Aggregate(eventKey, + aggregates); + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + long[] fieldsLong = dest.getAggregates().getFieldsLong(); + + for (int index = 0; + index < fieldsLong.length; + index++) { + //increment count + fieldsLong[index]++; + } + } + + @Override + public void aggregate(Aggregate destAgg, Aggregate srcAgg) + { + long[] destLongs = destAgg.getAggregates().getFieldsLong(); + long[] srcLongs = srcAgg.getAggregates().getFieldsLong(); + + for (int index = 0; + index < destLongs.length; + index++) { + //aggregate count + destLongs[index] += srcLongs[index]; + } + } + + @Override + public Type getOutputType(Type inputType) + { + return TYPE_CONVERSION_MAP.get(inputType); + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java new file mode 100644 index 0000000..f5924b8 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.gpo.Serde; +import com.datatorrent.lib.appdata.gpo.SerdeFieldsDescriptor; +import com.datatorrent.lib.appdata.gpo.SerdeListGPOMutable; +import com.datatorrent.lib.appdata.gpo.SerdeObjectPayloadFix; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; +import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; +import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; + +@Name("CUM_SUM") +/** + * @since 3.1.0 + */ + +public class AggregatorCumSum extends AggregatorSum +{ + private static final long serialVersionUID = 201506280518L; + + public static final int KEY_FD_INDEX = 0; + public static final int AGGREGATE_FD_INDEX = 1; + public static final int KEYS_INDEX = 2; + public static final int AGGREGATES_INDEX = 3; + + public static final FieldsDescriptor META_DATA_FIELDS_DESCRIPTOR; + + static { + Map<String, Type> fieldToType = Maps.newHashMap(); + fieldToType.put("fdkeys", Type.OBJECT); + fieldToType.put("fdvalues", Type.OBJECT); + fieldToType.put("keys", Type.OBJECT); + fieldToType.put("values", Type.OBJECT); + + Map<String, Serde> fieldToSerde = Maps.newHashMap(); + fieldToSerde.put("fdkeys", SerdeFieldsDescriptor.INSTANCE); + fieldToSerde.put("fdvalues", SerdeFieldsDescriptor.INSTANCE); + fieldToSerde.put("keys", SerdeListGPOMutable.INSTANCE); + fieldToSerde.put("values", SerdeListGPOMutable.INSTANCE); + + META_DATA_FIELDS_DESCRIPTOR = new FieldsDescriptor(fieldToType, + fieldToSerde, + new PayloadFix()); + } + + public AggregatorCumSum() + { + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + src.used = true; + Aggregate agg = createAggregate(src, + context, + aggregatorIndex); + + GPOUtils.indirectCopy(agg.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); + + GPOMutable metaData = new GPOMutable(getMetaDataDescriptor()); + + GPOMutable fullKey = new GPOMutable(src.getKeys()); + + if (context.inputTimestampIndex >= 0) { + fullKey.getFieldsLong()[context.inputTimestampIndex] = -1L; + } + + List<GPOMutable> keys = Lists.newArrayList(fullKey); + + GPOMutable value = new GPOMutable(agg.getAggregates()); + List<GPOMutable> values = Lists.newArrayList(value); + + metaData.getFieldsObject()[KEY_FD_INDEX] = fullKey.getFieldDescriptor(); + metaData.getFieldsObject()[AGGREGATE_FD_INDEX] = value.getFieldDescriptor(); + metaData.getFieldsObject()[KEYS_INDEX] = keys; + metaData.getFieldsObject()[AGGREGATES_INDEX] = values; + agg.setMetaData(metaData); + + return agg; + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + @SuppressWarnings("unchecked") + List<GPOMutable> destKeys = + (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX]; + + @SuppressWarnings("unchecked") + List<GPOMutable> destAggregates = + (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX]; + + long timestamp = 0L; + + if (context.inputTimestampIndex >= 0) { + timestamp = src.getKeys().getFieldsLong()[context.inputTimestampIndex]; + src.getKeys().getFieldsLong()[context.inputTimestampIndex] = -1L; + } + + if (!contains(destKeys, src.getKeys())) { + destKeys.add(new GPOMutable(src.getKeys())); + + GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor); + GPOUtils.indirectCopy(aggregates, src.getAggregates(), context.indexSubsetAggregates); + + destAggregates.add(aggregates); + + this.aggregateAggs(dest.getAggregates(), aggregates); + } + + if (context.inputTimestampIndex >= 0) { + src.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp; + } + } + + @Override + public void aggregate(Aggregate dest, Aggregate src) + { + dest.getMetaData().applyObjectPayloadFix(); + src.getMetaData().applyObjectPayloadFix(); + + @SuppressWarnings("unchecked") + List<GPOMutable> destKeys = + (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX]; + + @SuppressWarnings("unchecked") + List<GPOMutable> srcKeys = + (List<GPOMutable>)src.getMetaData().getFieldsObject()[KEYS_INDEX]; + + @SuppressWarnings("unchecked") + List<GPOMutable> destAggregates = + (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX]; + + @SuppressWarnings("unchecked") + List<GPOMutable> srcAggregates = + (List<GPOMutable>)src.getMetaData().getFieldsObject()[AGGREGATES_INDEX]; + + List<GPOMutable> newKeys = Lists.newArrayList(); + List<GPOMutable> newAggs = Lists.newArrayList(); + + for (int index = 0; + index < srcKeys.size(); + index++) { + GPOMutable currentSrcKey = srcKeys.get(index); + GPOMutable currentSrcAgg = srcAggregates.get(index); + + if (!contains(destKeys, currentSrcKey)) { + newKeys.add(currentSrcKey); + newAggs.add(currentSrcAgg); + + this.aggregateAggs(dest.getAggregates(), currentSrcAgg); + } + } + + destKeys.addAll(newKeys); + destAggregates.addAll(newAggs); + } + + private boolean contains(List<GPOMutable> mutables, GPOMutable mutable) + { + for (int index = 0; + index < mutables.size(); + index++) { + GPOMutable mutableFromList = mutables.get(index); + + if (GPOUtils.equals(mutableFromList, mutable)) { + return true; + } + } + + return false; + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return META_DATA_FIELDS_DESCRIPTOR; + } + + public static class PayloadFix implements SerdeObjectPayloadFix + { + @Override + public void fix(Object[] objects) + { + FieldsDescriptor keyfd = (FieldsDescriptor)objects[KEY_FD_INDEX]; + FieldsDescriptor valuefd = (FieldsDescriptor)objects[AGGREGATE_FD_INDEX]; + + @SuppressWarnings("unchecked") + List<GPOMutable> keyMutables = (List<GPOMutable>)objects[KEYS_INDEX]; + @SuppressWarnings("unchecked") + List<GPOMutable> aggregateMutables = (List<GPOMutable>)objects[AGGREGATES_INDEX]; + + fix(keyfd, keyMutables); + fix(valuefd, aggregateMutables); + } + + private void fix(FieldsDescriptor fd, List<GPOMutable> mutables) + { + for (int index = 0; + index < mutables.size(); + index++) { + mutables.get(index).setFieldDescriptor(fd); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java new file mode 100644 index 0000000..e1bf7d4 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; +import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; +import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; + +/** + * <p> + * This aggregator creates an aggregate out of the first {@link InputEvent} encountered by this aggregator. All + * subsequent + * {@link InputEvent}s are ignored. + * </p> + * <p> + * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so + * one is picked arbitrarily to be the first. + * </p> + * + * @since 3.1.0 + */ +@Name("FIRST") +public class AggregatorFirst extends AbstractIncrementalAggregator +{ + private static final long serialVersionUID = 20154301646L; + + public AggregatorFirst() + { + //Do nothing + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + Aggregate aggregate = super.getGroup(src, aggregatorIndex); + + GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); + + return aggregate; + } + + @Override + public Type getOutputType(Type inputType) + { + return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType); + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + //Ignore + } + + @Override + public void aggregate(Aggregate dest, Aggregate src) + { + //Ignore + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java new file mode 100644 index 0000000..09190e1 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import java.util.Collections; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +/** + * @since 3.1.0 + */ + +public enum AggregatorIncrementalType +{ + SUM(new AggregatorSum()), + MIN(new AggregatorMin()), + MAX(new AggregatorMax()), + COUNT(new AggregatorCount()), + LAST(new AggregatorLast()), + FIRST(new AggregatorFirst()), + CUM_SUM(new AggregatorCumSum()); + + public static final Map<String, Integer> NAME_TO_ORDINAL; + public static final Map<String, IncrementalAggregator> NAME_TO_AGGREGATOR; + + private IncrementalAggregator aggregator; + + static { + Map<String, Integer> nameToOrdinal = Maps.newHashMap(); + Map<String, IncrementalAggregator> nameToAggregator = Maps.newHashMap(); + + for (AggregatorIncrementalType aggType : AggregatorIncrementalType.values()) { + nameToOrdinal.put(aggType.name(), aggType.ordinal()); + nameToAggregator.put(aggType.name(), aggType.getAggregator()); + } + + NAME_TO_ORDINAL = Collections.unmodifiableMap(nameToOrdinal); + NAME_TO_AGGREGATOR = Collections.unmodifiableMap(nameToAggregator); + } + + AggregatorIncrementalType(IncrementalAggregator aggregator) + { + setAggregator(aggregator); + } + + private void setAggregator(IncrementalAggregator aggregator) + { + Preconditions.checkNotNull(aggregator); + this.aggregator = aggregator; + } + + public IncrementalAggregator getAggregator() + { + return aggregator; + } + + private static final Logger LOG = LoggerFactory.getLogger(AggregatorIncrementalType.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java new file mode 100644 index 0000000..f727036 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.dimensions.aggregator; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; +import com.datatorrent.lib.dimensions.DimensionsEvent; +import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; +import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; + +/** + * <p> + * This aggregator creates an aggregate out of the last {@link InputEvent} encountered by this aggregator. All previous + * {@link InputEvent}s are ignored. + * </p> + * <p> + * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so + * one is picked arbitrarily to be the last. + * </p> + * + * @since 3.1.0 + */ +@Name("LAST") +public class AggregatorLast extends AbstractIncrementalAggregator +{ + private static final long serialVersionUID = 20154301647L; + + public AggregatorLast() + { + //Do nothing + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + Aggregate aggregate = super.getGroup(src, aggregatorIndex); + + GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); + + return aggregate; + } + + @Override + public Type getOutputType(Type inputType) + { + return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType); + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + GPOUtils.indirectCopy(dest.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); + } + + @Override + public void aggregate(Aggregate dest, Aggregate src) + { + DimensionsEvent.copy(dest, src); + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return null; + } +}
